wiki/concepts/AI-Agent-Architecture-Konzept.md
2026-03-15 18:14:21 +01:00

57 KiB
Raw Blame History

Konzept: AI Agent Architecture & Unified Workspace

Poweron Platform -- Zentrales AI Handling und Editor-UI Status: Konzept / Version 2.0 / März 2026


1. Ausgangslage

Die Plattform hat vier isolierte Systeme für AI-Interaktionen, jedes mit eigenen Schwächen:

System Stärke Schwäche
serviceAi (Pipeline) Modell-Fallback, Billing, Dokumentenextraktion Rigide Pipeline (extract→structure→fill), N×M AI-Call-Explosion, kein Caching
Codeeditor ReAct-Loop mit Tool-Use, Agent-Modus Nur read-only Tools, eigenes UI, nur Codeeditor-Scope
Chatbot LangGraph, SQL/Tavily-Integration Kein File-Handling, keine Dokumentbearbeitung, eigenes UI
Chatplayground Workflow-Management Kein Streaming, kein Voice, eigenes UI

Zusätzliche Defizite:

  • Kein Wissenstransfer: Jeder Workflow startet bei null, selbst wenn dasselbe Dokument schon extrahiert wurde
  • File-System: Flach ohne Ordner, Tags oder FeatureInstance-Zuordnung
  • Keine externen Datenquellen im Chat referenzierbar (SharePoint-Ordner, Drive, etc.)
  • Grosse Dokumente: 500-Seiten-PDFs werden blind vollständig extrahiert (500+ AI-Calls)
  • Kein Container-Handling: ZIP-Files, Ordner und verschachtelte Dokumente werden nicht unterstützt
  • Keine generische Provider-Abstraktion: SharePoint/Outlook je eigene Implementierung, obwohl beides MSFT-Services über eine Connection. Kein Google Drive, FTP etc.
  • 36 Workflow-Actions isoliert: Bestehende Actions (SharePoint, Outlook, Jira, AI) nicht als Agent-Tools nutzbar

2. Zielarchitektur

2.1 Kernidee: Agent statt Pipeline

Bisher: Code entscheidet starr, was passiert:

extractContent() → generateStructure() → fillSections() → render()

Neu: AI entscheidet dynamisch, welche Tools sie braucht:

User Prompt + Files → Agent Loop → Tool Calls → Result

2.2 Gesamtübersicht

graph TB
  subgraph ui["Unified Workspace UI"]
    ChatPanel["Chat / Streaming"]
    FilePanel["File Browser"]
    DataPanel["Data Source Picker"]
    KnowledgePanel["Knowledge Browser"]
  end
  
  subgraph agentSvc["serviceAgent"]
    Loop["Agent Loop\nReAct + native function calling"]
    TR["Tool Registry"]
    CTX["Context Manager\nRAG Retrieval"]
  end
  
  subgraph knowledge["Knowledge Store"]
    SharedL["Shared Layer\nmandateId"]
    InstanceL["Instance Layer\nuserId + featureInstanceId"]
    WorkflowL["Workflow Layer\nworkflowId"]
  end
  
  subgraph tools["Tools"]
    FileT["File Tools"]
    DocT["Document Tools"]
    ConnT["Connection Tools"]
    ExtT["External Tools"]
    WfT["Workflow Tools"]
  end
  
  subgraph infra["Bestehende Infrastruktur"]
    AiSvc["serviceAi\nModell-Fallback + Billing"]
    ExtrSvc["serviceExtraction"]
    Stream["serviceStreaming"]
    Voice["serviceVoice"]
    UCon["UserConnections"]
  end
  
  ui -->|"SSE Stream"| agentSvc
  CTX -->|"RAG"| knowledge
  DocT -->|"Cache"| knowledge
  Loop --> TR
  TR --> tools
  Loop --> CTX
  Loop --> AiSvc
  Loop --> Stream
  DocT --> ExtrSvc
  ConnT --> UCon

3. Agent Core (serviceAgent)

3.1 Aufbau

Neuer Service unter modules/serviceCenter/services/serviceAgent/:

Datei Funktion
mainServiceAgent.py Entry Point: runAgent(prompt, fileIds, toolSet, config)
agentLoop.py ReAct-Loop mit native function calling
toolRegistry.py Tool-Registrierung und Dispatch
conversationManager.py Context-Window-Management
datamodelAgent.py AgentState, ToolDefinition, ToolResult, AgentConfig

ServiceCenter-Registry:

# In serviceCenter/registry.py
IMPORTABLE_SERVICES = {
    ...
    "agent": {
        "module": "modules.serviceCenter.services.serviceAgent.mainServiceAgent",
        "class": "ServiceAgent",
        "dependencies": ["ai", "chat", "extraction", "billing", "streaming"],
        "objectKey": "service.agent"
    }
}

serviceAgent hat die meisten transitiven Dependencies (über aichat, utils, extraction, billing). Die bestehende DI-Kette im ServiceCenter löst dies automatisch auf.

AgentConfig:

class AgentConfig(BaseModel):
    maxRounds: int = 25
    maxCostCHF: Optional[float] = None   # Workflow-Budget-Cap
    entityCacheEnabled: bool = False      # Opt-in Entity Extraction (Extra-AI-Call pro Round)
    toolSet: str = "core"                 # Welches Tool-Set aktiv
    operationType: str = "AGENT"          # Für Modell-Auswahl

3.2 Agent Loop

async def runAgent(prompt, fileIds, toolSet, config, progressCallback):
    state = AgentState(maxRounds=config.maxRounds)
    tools = toolRegistry.getTools(toolSet)
    messages = [buildSystemPrompt(tools), {"role": "user", "content": prompt}]
    
    while state.status == "running" and state.currentRound < state.maxRounds:
        state.currentRound += 1
        
        # Kosten-Budget prüfen
        if config.maxCostCHF and await billingService.getWorkflowCost(workflowId) > config.maxCostCHF:
            state.status = "budgetExceeded"
            yield AgentEvent(type="final", content=_summarizeProgress(state))
            break
        
        response = await aiService.callAi(AiCallRequest(
            messages=messages,
            options=AiCallOptions(operationType=OperationTypeEnum.AGENT),
            tools=tools
        ))
        
        toolCalls = response.toolCalls
        if not toolCalls:
            state.status = "completed"
            yield AgentEvent(type="final", content=response.content)
            break
        
        # Tool-Ausführung: sequential default, parallel nur für readOnly-Tools
        results = await _executeToolCalls(toolCalls, context)
        
        messages.extend(formatToolResults(toolCalls, results))
        yield AgentEvent(type="toolResults", data=results)
    
    # Graceful Degradation bei maxRounds
    if state.currentRound >= state.maxRounds and state.status == "running":
        state.status = "maxRoundsReached"
        yield AgentEvent(type="final", content=_summarizeProgress(state))


async def _executeToolCalls(toolCalls, context):
    """Sequential als Default. Parallel nur für readOnly-Tools (keine Race Conditions)."""
    readOnlyCalls = [tc for tc in toolCalls if toolRegistry.isReadOnly(tc.name)]
    writeCalls = [tc for tc in toolCalls if not toolRegistry.isReadOnly(tc.name)]
    
    results = {}
    if readOnlyCalls:
        readResults = await asyncio.gather(*[
            toolRegistry.dispatch(tc.name, tc.args, context) for tc in readOnlyCalls
        ])
        for tc, result in zip(readOnlyCalls, readResults):
            results[tc.id] = result
    
    for tc in writeCalls:
        results[tc.id] = await toolRegistry.dispatch(tc.name, tc.args, context)
    
    return [results[tc.id] for tc in toolCalls]

3.3 Design-Entscheidungen

  • Native function calling als Standard (OpenAI tool_choice Format) -- zuverlässiger als text-basiertes Parsing
  • Text-basierter Fallback für Modelle ohne function calling Support
  • Nutzt bestehende callAi-Infrastruktur (Modell-Fallback, Billing, Provider-Auswahl) -- damit greifen Provider-RBAC, Model-RBAC und Billing-Checks automatisch
  • AiCallRequest wird erweitert um messages und tools (abwärtskompatibel: bestehende prompt/context/contentParts bleiben)
  • Tool-Ausführung sequential als Default -- parallel nur für explizit als readOnly=True markierte Tools (verhindert Race Conditions bei State-modifizierenden Tools)
  • Kosten-Budget (maxCostCHF) pro Workflow -- Agent stoppt mit Fortschritts-Summary bei Überschreitung
  • Graceful Degradation bei maxRounds -- Agent liefert Summary statt stiller Abbruch

3.4 Fehlerbehandlung

Fehlertyp Strategie
Tool-Fehler (Exception in Tool) Fehler als ToolResult an Agent zurückgeben -- Agent entscheidet über Retry oder alternativen Ansatz
Halluzinierte Tool-Namen Validierung gegen toolRegistry.getTools() -- ungültige Tools werden als Fehler-ToolResult zurückgegeben
Externe API-Fehler (Timeout, 5xx) Retry mit Exponential Backoff (max 3 Versuche), dann Fehler-ToolResult
AI-Call-Fehler Bestehende Fallback-Logik in serviceAi (Modell-Fallback, Provider-Switch)
maxRounds erreicht Graceful Degradation: Agent liefert Summary des bisherigen Fortschritts
Budget überschritten Agent stoppt, liefert Zusammenfassung, User wird informiert
Knowledge Store nicht verfügbar Agent arbeitet ohne RAG-Kontext weiter (degraded mode)

Input-Validierung: Tool-Argumente vom LLM sind unvalidiert. Jedes Tool validiert seine Args vor Weitergabe an darunterliegende Interfaces:

  • Path-Traversal-Schutz (readFile, writeFile): Pfade auf erlaubten Scope beschränken
  • SQL-Injection-Schutz (sqlQuery): Parametrisierte Queries
  • Container-Limits: maxTotalExtractedSize, maxFileCount, Symlink-Blockierung (ZIP-Bomb-Schutz)

3.5 RBAC und Sicherheit

Design-Entscheidung: Keine Tool-Level-RBAC nötig. Die Berechtigungen greifen auf Daten- und Service-Ebene unterhalb der Tools:

Tool Absicherung unterhalb
File-Tools (readFile, writeFile, listFiles) interfaceDbAppgetRecordsetWithRBAC() mit WHERE nach MY/GROUP/ALL
External-Tools (externalBrowse, externalDownload) UserConnection OAuth-Token -- User sieht nur, was sein Account erlaubt
AI-Calls (Agent-Loop) serviceAi_checkBillingBeforeAiCall() → Provider-RBAC + Model-RBAC
Service-Zugriff can_access_service() prüft RESOURCE-ObjectKey

serviceAgent als IMPORTABLE_SERVICE:

serviceAgent wird in der ServiceCenter-Registry registriert mit objectKey = "service.agent". Damit greift can_access_service() und der Zugang zum Agent ist per Rolle steuerbar.

Knowledge Store Shared Layer:

Dokumente mit isShared=True sind mandateweit sichtbar. Neue Tabellen (FileContentIndex, ContentChunk) brauchen Einträge in TABLE_NAMESPACE mit DATA-ObjectKeys, damit getRecordsetWithRBAC() greift. Promote in Shared Layer (isShared=True) erfordert Access-Level ALL.

3.6 Tool Registry

CORE_TOOLS = ["readFile", "writeFile", "listFiles", "searchFiles",
              "extractContent", "summarizeContent", "webSearch"]

Jedes Tool wird mit readOnly-Flag registriert (steuert parallele vs. sequentielle Ausführung):

toolRegistry.register("readFile", readFileTool, readOnly=True)
toolRegistry.register("writeFile", writeFileTool, readOnly=False)
toolRegistry.register("sqlQuery", sqlQueryTool, featureType="chatbot", readOnly=True)
toolRegistry.register("sharePointUpload", spTool, featureType="trustee", readOnly=False)

4. Knowledge Store (RAG)

4.1 Kernprinzip

Persistenter Wissensspeicher mit drei Ebenen. Ein File wird einmal extrahiert und steht danach allen Workflows zur Verfügung. Automatische Indexierung bei Upload.

4.2 Drei-Ebenen-Architektur

graph TB
  subgraph scope["Knowledge Store"]
    direction TB
    
    subgraph shared["Shared Layer -- mandateId"]
      GD["Globale Firmen-Dokumente\nTemplates, Shared Knowledge"]
    end
    
    subgraph instance["Instance Layer -- userId + featureInstanceId"]
      FI["User-Dokumente\nFile-Embeddings\nDocument Indexes"]
    end
    
    subgraph wf["Workflow Layer -- workflowId"]
      WF["Conversation Summary\nEntity Cache\nTool Results"]
    end
    
    shared --> instance
    instance --> wf
  end
Ebene Scope Inhalt Lebensdauer
Shared mandateId Globale Dokumente, Templates Permanent
Instance userId + featureInstanceId Extrahierte Dokumente, Embeddings Permanent pro Workspace
Workflow workflowId Conversation Summaries, Entities, Tool-Ergebnisse Lebt mit dem Workflow

4.3 Automatische Indexierung bei Upload

Jedes hochgeladene File durchläuft eine Background-Pipeline -- komplett ohne AI:

Upload → Container auflösen (rekursiv) → Typ-Extractor pro File → Content-Objekte → Chunking → Embedding → Knowledge Store
async def _onFileUploaded(fileId, userId, featureInstanceId):
    fileData = await getFileData(fileId)
    mimeType = _detectMimeType(fileData)
    
    # 1. Container auflösen (ZIP/Folder → Files), dann pro File extrahieren
    files = await _resolveToFiles(fileId, fileData, mimeType)
    
    for file in files:
        # 2. Typ-Extractor: Content-Objekte extrahieren (kein AI)
        contentIndex = await _extractFileContents(file.id, file.data, file.mimeType)
        
        # 3. Text-Content-Objekte chunken und embedden
        textObjects = [o for o in contentIndex.objects if o.contentType == "text"]
        chunks = _chunkForEmbedding(textObjects, chunkSize=512)
        embeddings = await embeddingService.embed([c.data for c in chunks])
        
        # 4. Im Knowledge Store persistieren (userId + featureInstanceId scoped)
        for chunk, embedding in zip(chunks, embeddings):
            await knowledgeStore.upsertContentChunk(ContentChunk(
                contentObjectId=chunk.id, fileId=file.id,
                userId=userId, featureInstanceId=featureInstanceId,
                data=chunk.data, contextRef=chunk.contextRef,
                embedding=embedding
            ))
    
    await knowledgeStore.updateFileStatus(fileId, "indexed")

Ergebnis: Sekunden bis Minuten nach Upload ist das File für alle Workflows RAG-ready. Nachfolgende Aktivitäten holen den Inhalt direkt aus dem Knowledge Store.

4.4 Datenmodelle (PostgreSQL + pgvector)

class FileContentIndex(BaseModel):
    """Index der Content-Strukturen pro File. Lebt im Instance Layer.
    Wird bei Extraktion erstellt (kein AI)."""
    id: str                          # = fileId
    userId: str
    featureInstanceId: str
    mandateId: str
    isShared: bool = False           # Shared Layer Sichtbarkeit
    fileName: str
    mimeType: str
    containerPath: Optional[str]     # Pfad im Container (z.B. "archiv.zip/folder/report.pdf")
    totalObjects: int                # Anzahl Content-Objekte
    totalSize: int
    structure: Dict[str, Any]        # Strukturübersicht (Pages, Sections, etc.)
    objectSummary: List[Dict]        # Kompakte Übersicht pro Content-Objekt
    extractedAt: float
    status: str                      # pending, extracted, embedding, indexed, failed

class ContentChunk(BaseModel):
    """Persistierter Content-Chunk. Wiederverwendbar über Workflows.
    Skalares Content-Objekt (oder Chunk davon) mit Embedding."""
    id: str
    contentObjectId: str             # FK zum Content-Objekt
    fileId: str                      # FK zum File
    userId: str
    featureInstanceId: str
    contentType: str                 # text, image, videostream, audiostream, other
    data: str                        # Inhalt (Text, Base64, URL)
    contextRef: Dict[str, Any]       # Kontext-Referenz (Seite, Position, Label)
    summary: Optional[str]           # AI-generierte Zusammenfassung (bei Bedarf)
    metadata: Dict[str, Any] = {}
    embedding: List[float]           # pgvector (NOT NULL für text-Chunks)

class WorkflowMemory(BaseModel):
    """Workflow-spezifischer Cache."""
    id: str
    workflowId: str
    userId: str
    featureInstanceId: str
    key: str                         # z.B. "entity:companyName"
    value: str
    source: str                      # extraction, tool, conversation, summary
    createdAt: float
    embedding: Optional[List[float]]

4.5 RAG-Retrieval: 3-Ebenen-Suche

Vor jedem Agent-Round sucht der Context Manager über alle drei Ebenen:

async def _buildAgentContext(self, currentPrompt, workflowId, userId,
                              featureInstanceId, mandateId, contextBudget):
    # 1. Instance Layer: User-Dokumente (höchste Priorität)
    instanceChunks = await knowledgeStore.semanticSearch(
        query=currentPrompt, userId=userId,
        featureInstanceId=featureInstanceId, limit=15, minScore=0.65
    )
    # 2. Shared Layer: Globale Firmen-Dokumente
    sharedChunks = await knowledgeStore.semanticSearch(
        query=currentPrompt, mandateId=mandateId,
        isShared=True, limit=10, minScore=0.7
    )
    # 3. Workflow Layer: Aktueller Kontext
    entities = await knowledgeStore.getEntities(workflowId)
    summary = await knowledgeStore.getConversationSummary(workflowId)
    toolResults = await knowledgeStore.getRecentToolResults(workflowId, limit=5)
    
    context = ContextBuilder(budget=contextBudget)
    context.add(priority=1, content=currentPrompt)
    context.add(priority=2, content=instanceChunks)
    context.add(priority=3, content=entities)
    context.add(priority=4, content=sharedChunks)
    context.add(priority=5, content=summary)
    context.add(priority=6, content=toolResults)
    return context.build()

4.6 Cross-Workflow Wissenstransfer

Workflow A (Montag):
  User: "Analysiere Q4-Report.pdf"
  → Extraktion + Embedding im Instance Layer (200 Chunks)
  → Agent findet: Revenue = 12.5M CHF

Workflow B (Dienstag):
  User: "Vergleiche Q4-Report mit Budget.xlsx"
  → Q4-Report: Bereits im Knowledge Store → 0 Extraktion
  → Budget.xlsx: Automatisch indexiert nach Upload
  → Semantische Suche "Q4 Revenue" → sofort relevanter Chunk

4.7 Progressive Summarization

Statt alte Nachrichten abzuschneiden (trim_messages):

Round 1-3:  Volle Nachrichten
Round 4:    Nachrichten 1-2 → Summary
Round 7:    Summary(1-4) + volle 5-7
Round 15:   Meta-Summary(1-10) + Summary(11-13) + volle 14-15

Der Agent verliert nie den Faden. Summaries werden im Workflow Layer persistiert. Kosten: Jede Summarization ist ein AI-Call (günstigeres Modell, z.B. DATA_ANALYSE). Bei einem 15-Round-Workflow: ~3 Summary-Calls. Diese Kosten fliessen ins Workflow-Budget (maxCostCHF) ein und erscheinen als eigener Typ in der BillingTransaction.

4.8 Entity Cache

Extraktion wichtiger Fakten nach Tool-Ergebnissen. Opt-in per Config (jeder Aufruf ist ein Extra-AI-Call):

if config.entityCacheEnabled:
    entities = await _extractEntities(response)
    # {"company": "Acme AG", "revenueQ4": "12.5M CHF"}
    for key, value in entities.items():
        await knowledgeStore.upsertEntity(workflowId, key, value)

Kosten-Kontrolle: Entity-Extraktion ist standardmässig deaktiviert. Für häufig genutzte Entities (Firmennamen, Beträge, Daten) kann alternativ regelbasierte Extraktion (Regex/NER) ohne AI-Call eingesetzt werden.

4.9 Embedding-Strategie

Aspekt Entscheidung
Embedding-Modell OpenAI text-embedding-3-small (1536 dim); lokal: sentence-transformers
Vector Store pgvector auf bestehendem PostgreSQL
Chunk-Grösse 512 Tokens
Index-Typ IVFFlat (< 100k Chunks), HNSW (> 100k)
Suche Cosine Similarity, gefiltert nach userId + featureInstanceId

5. Smart Document Handling

5.1 Problem

Ein 200MB PDF mit 500 Seiten: Aktuell werden alle 500 Seiten blind extrahiert, jede einzeln an AI gesendet → 500+ AI-Calls.

5.2 Lösung: 3-Stufen-Modell

graph TD
  Upload["Dokument Upload"]
  
  subgraph stage1["Stufe 1: Structure Pre-Scan"]
    TOC["TOC Extraction"]
    Headings["Heading Detection"]
    PageMap["Page Map"]
  end
  
  subgraph stage2["Stufe 2: Selective Extraction"]
    OnDemand["Nur angeforderte Seiten"]
    Cache["Knowledge Store Cache"]
  end
  
  subgraph stage3["Stufe 3: Deep Analysis"]
    Vision["Vision AI"]
    TableEx["Tabellen-Extraktion"]
    OCR["OCR"]
  end
  
  Upload --> stage1
  stage1 -->|"Agent entscheidet"| stage2
  stage2 -->|"Bei Bedarf"| stage3

Stufe 1 -- Structure Pre-Scan (Sekunden, kein AI): Rein programmatisch mit PyMuPDF. Extrahiert TOC, Headings (Font-Size-Heuristik), Seitenstruktur, Bildpositionen. Ergebnis ist der FileContentIndex (siehe 5.4). Der Agent sieht sofort die Struktur eines 500-Seiten-PDFs.

async def preScanDocument(fileId: str) -> FileContentIndex:
    doc = fitz.open(stream=fileData, filetype="pdf")
    toc = doc.get_toc()
    
    pageMap = []
    for i, page in enumerate(doc):
        blocks = page.get_text("dict")["blocks"]
        pageMap.append({
            "pageIndex": i,
            "headings": [h for h in blocks if _isHeading(h)],
            "hasImages": any(b["type"] == 1 for b in blocks),
            "textLength": len(page.get_text()),
            "hasTable": _detectTableHeuristic(page)
        })
    
    sections = _buildSectionsFromTocOrHeadings(toc, pageMap)
    return FileContentIndex(id=fileId, structure={"toc": toc, "sections": sections, "pageMap": pageMap})

Stufe 2 -- On-Demand Extraction: Agent liest nur die Seiten, die er braucht. Ergebnisse werden im Knowledge Store gecacht.

async def readSection(fileId, sectionId):
    cached = await knowledgeStore.getCachedSection(fileId, sectionId)
    if cached:
        return cached
    
    section = _findSection(docIndex, sectionId)
    parts = _extractPages(fileData, section["startPage"], section["endPage"])
    await knowledgeStore.cacheExtraction(fileId, sectionId, parts)
    return _formatSectionResult(parts)

Stufe 3 -- Deep Analysis (nur bei explizitem Bedarf): Vision AI für Bilder, Tabellen-Extraktion, OCR -- nur für einzelne angeforderte Elemente.

5.3 Kostenvergleich

Szenario Alt (Pipeline) Neu (Smart)
"Fasse Kapitel 3 zusammen" (8/500 Seiten) 500 Seiten, 500+ AI-Calls Pre-Scan + 8 Seiten + 1 AI-Call
"Was steht auf Seite 47?" 500 Seiten, 500+ AI-Calls Pre-Scan + 1 Seite + 1 AI-Call
Zweite Frage zum selben Dokument Erneut 500 Seiten Aus Knowledge Store, 0 Extraktion
Gesamtanalyse 500+ AI-Calls ~21 AI-Calls (20 Section-Summaries + 1 Meta-Summary)

5.4 Container- und Content-Modell

Das System unterscheidet klar zwischen physischen Strukturen (Container, Files) und logischen Content-Objekten (Text, Bild, Audio, Video). Die gesamte Extraktion bis zu den Content-Objekten erfolgt ohne AI.

Physische Schicht: Container-Hierarchie

Container sind verschachtelt (n,m : 0..i):

graph TD
  ZIP["ZIP (Container)"]
  FA["folder-a/ (Container)"]
  FB["folder-b/ (Container)"]
  PDF["report.pdf (File → Typ-Extractor)"]
  IMG["photo.png (File → Typ-Extractor)"]
  DOCX["vertrag.docx (File → Typ-Extractor)"]
  CSV["data.csv (File → Typ-Extractor)"]
  
  ZIP --> FA
  ZIP --> FB
  FA --> PDF
  FA --> IMG
  FB --> DOCX
  FB --> CSV
Ebene Regel
ZIP / TAR / GZ Container → enthält n Folders + m Files
Folder Container → enthält n Folders + m Files
File Hat einen bestimmten Typ → wird vom passenden Extractor verarbeitet

File-Typen und ihre Container-Eigenschaft:

File-Typ Container? Struktur
PDF Ja Seiten als Kontext-Referenz, jede Seite enthält Content-Objekte
DOCX Ja Abschnitte/Paragraphen als Kontext-Referenz (Seiten nur als Referenz, nicht als Container)
PPTX Ja Slides als Kontext-Referenz, jeder Slide enthält Content-Objekte
XLSX Ja Sheets als Kontext-Referenz, jedes Sheet enthält Content-Objekte
PNG/JPG/GIF Nein File selbst ist ein einzelnes Content-Objekt (image)
MP4/WebM Nein File selbst ist ein einzelnes Content-Objekt (videostream)
MP3/WAV Nein File selbst ist ein einzelnes Content-Objekt (audiostream)
TXT/CSV/JSON/XML Nein File selbst ist ein einzelnes Content-Objekt (text)

Logische Schicht: Content-Objekte

Jeder Extractor liefert skalare Content-Objekte mit Kontext-Referenz zum Ursprung im File. Es gibt genau fünf Content-Typen:

ContentType Beschreibung Beispiel
text Textinhalt (Fliesstext, Tabelle als Markdown, Code) Paragraph auf Seite 3, Tabelle in Sheet "Q4"
image Einzelbild (als Base64 oder Referenz) Bild auf Seite 5 unten links, Caption "Übersicht"
videostream Video-Inhalt Eingebettetes Video in PPTX Slide 7
audiostream Audio-Inhalt Voice-Memo Attachment
other Nicht klassifizierbar (Binary, Formeln, etc.) Eingebettetes OLE-Objekt

Content-Objekt Datenmodell:

class ContentObject(BaseModel):
    id: str
    fileId: str                      # FK zum physischen File
    contentType: str                 # text, image, videostream, audiostream, other
    data: str                        # Inhalt (Text, Base64, URL)
    contextRef: ContentContextRef    # Kontext-Referenz zum Ursprung
    metadata: Dict[str, Any] = {}    # Extractor-spezifische Metadaten
    sequence: int = 0                # Reihenfolge innerhalb des Kontexts

class ContentContextRef(BaseModel):
    """Referenz zum Kontext im Container/File."""
    containerPath: str               # z.B. "archiv.zip/folder-a/report.pdf"
    location: str                    # z.B. "page:5/region:bottomLeft"
    label: Optional[str] = None      # z.B. "Abbildung 3: Übersicht"
    pageIndex: Optional[int] = None  # Seiten-Nummer (wenn relevant)
    sectionId: Optional[str] = None  # Abschnitt/Heading (wenn relevant)
    sheetName: Optional[str] = None  # Sheet-Name (XLSX)
    slideIndex: Optional[int] = None # Slide-Nummer (PPTX)

Extraktions-Pipeline (komplett ohne AI)

graph LR
  Input["ZIP / Folder / File"]
  
  subgraph phase1 ["Phase 1: Container auflösen"]
    Unpack["Entpacken\nrekursiv"]
    FileList["File-Liste\nmit Typen"]
  end
  
  subgraph phase2 ["Phase 2: Content extrahieren"]
    Ext["Typ-Extractor\npro File"]
    CO["Content-Objekte\nmit ContextRef"]
  end
  
  subgraph phase3 ["Phase 3: Indexierung"]
    Idx["ContentIndex\npro File"]
    KS["Knowledge Store"]
  end
  
  Input --> phase1
  phase1 --> phase2
  phase2 --> phase3
async def _extractFileContents(fileId: str, fileData: bytes, mimeType: str) -> FileContentIndex:
    """Extrahiert alle Content-Objekte eines Files. Kein AI-Call."""
    extractor = extractorRegistry.resolve(mimeType)
    contentObjects = await extractor.extract(fileData)
    
    contentIndex = FileContentIndex(
        fileId=fileId,
        totalObjects=len(contentObjects),
        structure=_buildStructureMap(contentObjects),
        objects=contentObjects
    )
    
    await knowledgeStore.upsertContentIndex(contentIndex)
    return contentIndex

Rekursive Container-Auflösung:

MAX_TOTAL_EXTRACTED_SIZE = 500 * 1024 * 1024  # 500 MB
MAX_FILE_COUNT = 10000

async def _resolveContainerRecursive(containerData, containerPath="", depth=0, maxDepth=5,
                                      _state=None):
    """Löst Container rekursiv auf bis zu den Files. Kein AI-Call.
    Schutz vor ZIP-Bombs: maxDepth, maxTotalExtractedSize, maxFileCount, Symlink-Blockierung."""
    if _state is None:
        _state = {"totalSize": 0, "fileCount": 0}
    allFiles = []
    entries = _listEntries(containerData, blockSymlinks=True)
    
    for entry in entries:
        entryPath = f"{containerPath}/{entry.name}" if containerPath else entry.name
        
        if entry.isContainer:  # Folder oder verschachteltes ZIP
            childData = _extractEntry(containerData, entry)
            childFiles = await _resolveContainerRecursive(
                childData, entryPath, depth + 1, maxDepth
            )
            allFiles.extend(childFiles)
        else:
            _state["totalSize"] += entry.size
            _state["fileCount"] += 1
            if _state["totalSize"] > MAX_TOTAL_EXTRACTED_SIZE:
                raise ContainerLimitError(f"Total extracted size exceeds {MAX_TOTAL_EXTRACTED_SIZE}")
            if _state["fileCount"] > MAX_FILE_COUNT:
                raise ContainerLimitError(f"File count exceeds {MAX_FILE_COUNT}")
            allFiles.append(FileEntry(
                path=entryPath,
                data=_extractEntry(containerData, entry),
                mimeType=_detectMimeType(entry.name),
                size=entry.size
            ))
    
    return allFiles

File Content Index (Metadaten pro File)

Jedes File erhält einen FileContentIndex als Metadaten (kanonische Definition siehe Abschnitt 4.4). Zusätzlich enthält jeder Index ContentObjectSummary-Einträge:

class ContentObjectSummary(BaseModel):
    """Kompakte Beschreibung eines Content-Objekts im Index."""
    id: str
    contentType: str                 # text, image, videostream, audiostream, other
    contextRef: ContentContextRef
    charCount: Optional[int] = None  # Nur für text
    dimensions: Optional[str] = None # Nur für image/video (z.B. "1920x1080")
    duration: Optional[float] = None # Nur für audio/video (Sekunden)

Beispiel für einen PDF-ContentIndex:

report.pdf → FileContentIndex:
  totalObjects: 47
  structure: {pages: 12, images: 8, tables: 5, textBlocks: 34}
  objectSummary:
    - {id: "co-1", type: "text", contextRef: {page: 1, location: "header"}, charCount: 245}
    - {id: "co-2", type: "text", contextRef: {page: 1, location: "body"}, charCount: 1830}
    - {id: "co-3", type: "image", contextRef: {page: 1, location: "bottom", label: "Abb. 1"}, dimensions: "800x600"}
    - ...

AI arbeitet auf skalaren Content-Objekten

Nach der Extraktion hat der Agent nur noch skalare Content-Objekte vor sich. AI-Calls werden gezielt auf einzelne Objekte angewendet:

Agent: "Analysiere Kapitel 3 des Reports"
  1. browseContainer("report.pdf") → ContentIndex (47 Objekte, Struktur)
  2. Agent sieht: Kapitel 3 = Seiten 5-8, 12 Content-Objekte
  3. readContentObjects(fileId, filter={pageIndex: [5,6,7,8]}) → 12 skalare Objekte
  4. AI-Call mit den 12 Objekten als Kontext → Analyse

Container-Tools für den Agent

Tool Funktion
browseContainer Zeigt ContentIndex / Baumstruktur eines Containers (ZIP, Folder, PDF, etc.)
readContentObjects Liest spezifische Content-Objekte nach Filter (Seite, Typ, Section)
extractContainerItem Extrahiert on demand ein Element, das noch nicht im Knowledge Store ist

Prompt-Referenzen: Im Unified UI können Container direkt referenziert werden:

  • @archiv.zip -- Agent sieht Baumstruktur, extrahiert gezielt
  • @projektordner/ -- Agent browst Ordnerinhalt
  • @report.pdf -- Agent sieht ContentIndex mit allen Content-Objekten

6. Document Tools

Tools für den Agent -- arbeiten auf der physischen (Container) und logischen (Content-Objekte) Schicht:

Container-Tools (physische Schicht):

Tool Funktion Intern
browseContainer Baumstruktur + ContentIndex eines Containers anzeigen Container-Auflösung + FileContentIndex
extractContainerItem Spezifisches File aus Container on demand extrahieren Rekursive Pipeline → Content-Objekte → Knowledge Store

Content-Tools (logische Schicht, skalare Objekte):

Tool Funktion Intern
readContentObjects Content-Objekte nach Filter lesen (Seite, Typ, Section) Knowledge Store / on-demand Extraktion
summarizeContent AI-basierte Zusammenfassung von Content-Objekten serviceAi.callAi() mit Content-Objekten als Input
analyzeContent AI-basierte Analyse mit Prompt über Content-Objekte serviceAi.callAi()
generateDocument DOCX/PDF/CSV erzeugen Bestehende Renderer
editDocument Bestehendes Dokument ändern Neue Logik

7. File Management

7.1 FileItem erweitern

class FileItem(BaseModel):
    # bestehend:
    id, mandateId, featureInstanceId, fileName, mimeType, fileHash, fileSize, creationDate
    
    # NEU:
    tags: List[str] = []
    folderId: Optional[str] = None
    description: Optional[str] = ""
    status: str = "active"           # active, archived, processing

7.2 FileFolder (neu)

class FileFolder(BaseModel):
    id: str
    parentId: Optional[str] = None
    name: str
    featureInstanceId: str
    mandateId: str
    createdBy: str
    autoRule: Optional[str] = None   # z.B. "tag:financial"

7.3 File Tools

Tool Funktion
listFiles Auflisten mit Filter (tags, folder, mimeType, pattern)
readFile Inhalt lesen
writeFile Erstellen oder überschreiben
moveFile In Ordner verschieben
tagFile Tags hinzufügen/entfernen
searchFiles Volltextsuche über Inhalte

7.4 File-Referenzen im Prompt

  • @filename.pdf -- Autocomplete-Referenz
  • Drag-and-Drop Upload -- automatisch als File gespeichert + indexiert
  • Agent erhält fileIds und arbeitet via Tools

8. Data Containers (externe Datenquellen)

8.1 Konzept

User kann externe Datenquellen aus UserConnections (OAuth: Microsoft, Google) im Chat referenzieren. Konfiguration pro FeatureInstance.

8.2 DataSource (neu)

class DataSource(BaseModel):
    id: str
    connectionId: str               # FK zu UserConnection
    sourceType: str                  # sharepointFolder, googleDriveFolder, outlookFolder, ftpFolder
    path: str                        # z.B. "/sites/MySite/Documents/Reports"
    label: str
    featureInstanceId: Optional[str]
    autoSync: bool = False
    lastSynced: Optional[float]

8.3 Provider-Connector-Architektur (1:n)

Kernprinzip: Ein Connector ist pro Provider (Lieferant), nicht pro Service. Eine MSFT-Connection bedient SharePoint, Outlook, Teams etc. gleichzeitig. Die Beziehung ist 1 Provider : n Services.

IST-Zustand: SharePoint und Outlook haben je eigene Services, Connection-Helper und Method-Actions, obwohl beide über dieselbe MSFT-Connection laufen. Es gibt keine generische Abstraktion. Google Drive, FTP etc. sind nicht implementiert.

Lösung: ProviderConnector + ServiceAdapter

graph TB
  AgentLoop["Agent Loop"]
  GenTool["Generische Connection-Tools"]
  
  subgraph providers ["ProviderConnector (1 pro Lieferant)"]
    MSFT["MsftConnector\n1 Connection → n Services"]
    Google["GoogleConnector\n1 Connection → n Services"]
    FtpProv["FtpConnector\n1 Connection → 1 Service"]
  end
  
  subgraph msftServices ["MSFT Services (über 1 Token)"]
    SP["SharePoint\nFiles, Sites"]
    OL["Outlook\nMail, Calendar"]
    Teams["Teams\nMessages, Channels"]
    OneDrive["OneDrive\nFiles"]
  end
  
  subgraph googleServices ["Google Services (über 1 Token)"]
    GDrive["Google Drive\nFiles"]
    GMail["Gmail\nMail"]
  end
  
  AgentLoop --> GenTool
  GenTool --> providers
  MSFT --> msftServices
  Google --> googleServices
class ProviderConnector(ABC):
    """Ein Connector pro Provider. Verwaltet eine UserConnection + Token.
    Bietet Zugriff auf n Services des Providers."""

    def __init__(self, connection: UserConnection, accessToken: str):
        self.connection = connection
        self.accessToken = accessToken

    @abstractmethod
    def getAvailableServices(self) -> List[str]:
        """Welche Services bietet dieser Provider?"""

    @abstractmethod
    def getServiceAdapter(self, service: str) -> "ServiceAdapter":
        """Gibt den ServiceAdapter für einen bestimmten Service zurück."""


class ServiceAdapter(ABC):
    """Standardisierte Operationen pro Service eines Providers."""

    @abstractmethod
    async def browse(self, path: str, filter: str = None) -> List[ExternalEntry]: ...

    @abstractmethod
    async def download(self, path: str) -> bytes: ...

    @abstractmethod
    async def upload(self, path: str, data: bytes, fileName: str) -> ExternalEntry: ...

    @abstractmethod
    async def search(self, query: str, path: str = None) -> List[ExternalEntry]: ...

Konkrete Provider und ihre Services:

ProviderConnector Authority Services Status
MsftConnector MSFT sharepoint (Files, Sites), outlook (Mail, Calendar), teams (Messages), onedrive (Files) Migration bestehender Services
GoogleConnector GOOGLE drive (Files), gmail (Mail) Neu
FtpConnector LOCAL files (FTP/SFTP) Neu
JiraConnector LOCAL tickets (Issues, Boards) Migration bestehender Jira-Actions

ConnectorResolver:

class ConnectorResolver:
    _providerRegistry = {
        "msft": MsftConnector,
        "google": GoogleConnector,
        "local:ftp": FtpConnector,
        "local:jira": JiraConnector,
    }

    async def resolve(self, connectionId: str) -> ProviderConnector:
        """Löst connectionId → Provider-Connector mit frischem Token auf."""
        connection = await _getUserConnection(connectionId)
        token = await _getFreshToken(connectionId)
        providerClass = self._providerRegistry[connection.authority]
        return providerClass(connection, token.tokenAccess)

    async def resolveService(self, connectionId: str, service: str) -> ServiceAdapter:
        """Löst connectionId + service → konkreten ServiceAdapter auf."""
        provider = await self.resolve(connectionId)
        return provider.getServiceAdapter(service)

Beispiel: MsftConnector (1 Connection → n Services):

class MsftConnector(ProviderConnector):

    def getAvailableServices(self) -> List[str]:
        return ["sharepoint", "outlook", "teams", "onedrive"]

    def getServiceAdapter(self, service: str) -> ServiceAdapter:
        adapters = {
            "sharepoint": SharepointAdapter(self.accessToken),
            "outlook": OutlookAdapter(self.accessToken),
            "teams": TeamsAdapter(self.accessToken),
            "onedrive": OneDriveAdapter(self.accessToken),
        }
        return adapters[service]

8.4 Connection Tools (generisch)

Ein generisches Tool-Set -- der Agent gibt connectionId + service an, der Rest läuft über den Provider:

Tool Funktion Intern
listConnections Verfügbare Connections + ihre Services anzeigen UserConnections + getAvailableServices()
externalBrowse Ordner/Files einer externen Quelle listen resolveService(connId, service).browse(path)
externalDownload Datei herunterladen → lokales File + Auto-Index adapter.download(path)saveUploadedFile → Knowledge Store
externalUpload Lokales File in externe Quelle hochladen adapter.upload(path, data)
externalSearch In externer Quelle suchen adapter.search(query)
sendMail E-Mail senden (Mail-spezifische Parameter) resolveService(connId, "outlook").send()

Die bestehenden MethodSharepoint- und MethodOutlook-Actions werden schrittweise hinter MsftConnectorSharepointAdapter / OutlookAdapter migriert. Während der Migration fungiert der ActionToolAdapter (siehe Abschnitt 10.3) als Brücke.

8.5 Ablauf

User: "Analysiere die letzten 3 Reports aus SharePoint/Reports"

Agent → externalBrowse(connectionId="msft-123", service="sharepoint", path="/Reports", filter="*.pdf")
Agent → externalDownload(connectionId="msft-123", service="sharepoint", path="/Reports/Q4-Report.pdf")
        → File wird lokal gespeichert + automatisch im Knowledge Store indexiert
Agent → readContentObjects(fileId="abc123", filter={contentType: "text"})
        → Bereits indexiert, Content-Objekte aus Knowledge Store
Agent → Antwortet mit Analyse

Gleicher Provider, anderer Service:

User: "Sende die Analyse per Mail an das Team"

Agent → sendMail(connectionId="msft-123", to=["team@firma.ch"], subject="Q4 Analyse", body="...")
        → Selbe MSFT-Connection, Service "outlook"

Anderer Provider, identisches Tool-Interface:

User: "Lade das Budget aus meinem Google Drive"

Agent → externalBrowse(connectionId="google-456", service="drive", path="/Finanzen", filter="Budget*")
Agent → externalDownload(connectionId="google-456", service="drive", path="/Finanzen/Budget-2026.xlsx")
        → Identischer Flow: lokales File + Knowledge Store

9. Unified Workspace UI

9.1 Kernidee

Der Codeeditor hat die beste UI-Architektur (SSE Streaming, Agent-Progress, File-Management). Dieses UI wird zur Grundlage für alle AI-Interaktionen. Chatbot und Playground gehen darin auf.

9.2 Layout

graph LR
  subgraph unified["Unified AI Workspace"]
    subgraph left["Linke Sidebar"]
      ConvList["Conversation List"]
      FileTree["File Browser\nFolders + Tags"]
      DataSrc["Data Sources\nSharePoint, Drive"]
    end
    
    subgraph center["Hauptbereich"]
      ChatStream["Chat / Streaming\nMarkdown + Code + Edits\nTabellen + Charts"]
    end
    
    subgraph right["Rechte Sidebar"]
      FilePreview["File Preview / Editor"]
      ToolActivity["Tool Activity Log"]
    end
    
    subgraph bottom["Input-Bereich"]
      PromptInput["Text + Voice"]
      AttachBar["@file + Upload + DataSources"]
      ActionBar["Send / Stop / Voice"]
    end
  end

9.3 UI-Komponenten

Linke Sidebar:

  • Conversation List: Alle Workflows/Chats, sortiert nach Datum
  • File Browser: Lokale Files mit Ordnerstruktur und Tags, Upload per Drag-and-Drop
  • Data Sources: Konfigurierte externe Quellen (SharePoint, Drive) pro FeatureInstance

Hauptbereich:

  • Chat Stream: SSE-basiertes Streaming mit Token-Chunks, Markdown-Rendering, Code-Blöcke, File-Edit-Proposals, Tabellen, Charts

Rechte Sidebar (kontextabhängig):

  • File Preview / Editor: Vorschau oder Bearbeitung ausgewählter Files
  • Tool Activity Log: Zeigt in Echtzeit, welche Tools der Agent nutzt

Input-Bereich:

  • Text-Eingabe mit @file-Autocomplete
  • Voice-Toggle (Mikrofon-Button)
  • Attachment-Bar mit aktiven Files und DataSources
  • Send / Stop / Voice-Buttons

9.4 SSE Event-Typen

Bestehend (vom Codeeditor übernommen):

Event Funktion
message Text-Nachricht (user/assistant)
status Progress-Label
fileEditProposal Datei-Änderungsvorschlag
fileVersion Akzeptierte Änderung
agentProgress Round/Tool-Call Fortschritt
agentSummary Abschluss-Statistik
complete / stopped / error Workflow-Ende

Neu:

Event Funktion
chunk Token-Streaming für Echtzeit-Textausgabe
toolCall Agent ruft Tool auf (für Activity Log)
toolResult Tool-Ergebnis (für Activity Log)
fileCreated Neues File erstellt
dataSourceAccess Zugriff auf externe Quelle
voiceResponse TTS Audio-Daten

9.5 Voice Integration

  • STT: Browser Web Speech API oder Whisper API. Mikrofon-Button im Input-Bereich. Transkribierter Text wird als Prompt gesendet.
  • TTS: Agent-Antworten optional vorlesen. Nutzt bestehende interfaceVoiceObjects oder Browser TTS.
  • Voice-Toggle: Input via Mikrofon, Output via TTS.
POST /api/workspace/{instanceId}/voice/transcribe   → { "text": "..." }
POST /api/workspace/{instanceId}/voice/synthesize    → audio blob

9.6 Prompt Input Request

class WorkspaceInputRequest(BaseModel):
    prompt: str
    fileIds: List[str] = []
    uploadedFiles: List[UploadFile] = []
    dataSourceIds: List[str] = []
    voiceMode: bool = False
    workflowId: Optional[str] = None
    userLanguage: str = "en"

10. Feature Integration

10.1 Unified Workspace ersetzt drei Features

Chatbot, Codeeditor und Playground migrieren auf serviceAgent + Unified UI. Die Unterscheidung erfolgt über die Tool-Konfiguration pro FeatureInstance:

Feature (bisher) Tool-Set
Chatbot (SQL) core + sqlQuery + webSearch
Chatbot (Tavily) core + webSearch
Codeeditor core + readFile + writeFile + applyFileEdit + searchFiles
Playground core + webSearch + sendMail
Trustee core + connectionTools + documentAnalysis

core = readFile, writeFile, listFiles, extractContent, summarizeContent, generateDocument

10.2 serviceAi als Low-Level-Layer

serviceAi bleibt bestehen für:

  • Modell-Auswahl und Fallback
  • Billing pro Call
  • Provider-Routing (OpenAI, Anthropic, PrivateLLM)
  • Native function calling Support

Die Orchestrierungs-Logik (Structure Filling, Looping, JSON Repair) wandert in den Agent.

10.3 Method/Action-Integration (bestehende Workflow-Actions als Tools)

IST-Zustand: Das Workflow-System hat 7 Methods mit 36 Actions, registriert via methodDiscovery.py. Jede Action hat actionId, description, parameters (Schema) und execute (async). 22 davon haben dynamicMode=True und werden bereits dynamisch von AI ausgewählt. Die Ausführung läuft über actionExecutor.executeAction(method, action, params).

Kategorie Methods Actions dynamicMode
AI ai process, webResearch, summarizeDocument, translateDocument, convertDocument, generateDocument, generateCode Ja
External sharepoint, outlook 9 SharePoint + 4 Outlook Actions Ja
Domain context, trustee getDocumentIndex, extractContent, neutralizeData, extractFromFiles, processDocuments, syncToAccounting Teilweise
Integration jira, chatbot 8 Jira + 1 Chatbot Actions Nein/Teilweise

Lösung: 3-Stufen-Migration

graph LR
  subgraph stufe1 ["Stufe 1: Automatisches Wrapping"]
    Actions["Bestehende Actions\n36 Actions"]
    Adapter["ActionToolAdapter"]
    ToolReg["Tool Registry"]
  end
  
  subgraph stufe2 ["Stufe 2: Refactoring"]
    NativeTools["Native Agent Tools"]
    ConnTools["ProviderConnector Tools"]
  end
  
  subgraph stufe3 ["Stufe 3: Abloesung"]
    AgentNative["Agent-native Logik\nersetzt AI-Actions"]
  end
  
  Actions --> Adapter
  Adapter --> ToolReg
  ToolReg --> NativeTools
  NativeTools --> ConnTools
  ConnTools --> AgentNative

Stufe 1 -- Automatisches Wrapping (sofort, kein Rewrite):

Der ActionToolAdapter wandelt jede Action mit dynamicMode=True automatisch in ein Agent-Tool um. Kein Code-Rewrite nötig:

class ActionToolAdapter:
    """Wrapped bestehende Workflow-Actions als Agent-Tools."""

    def _buildToolsFromActions(self) -> List[ToolDefinition]:
        tools = []
        for methodName, method in methods.items():
            for actionName, action in method["actions"].items():
                if not action.get("dynamicMode"):
                    continue
                tools.append(ToolDefinition(
                    name=f"{methodName}.{actionName}",
                    description=action["description"],
                    parameters=_convertParameterSchema(action["parameters"])
                ))
        return tools

    async def dispatch(self, toolName: str, args: Dict) -> ToolResult:
        methodName, actionName = toolName.split(".", 1)
        result = await actionExecutor.executeAction(methodName, actionName, args)
        return ToolResult(
            success=result.success,
            data=_formatActionResult(result),
            error=result.error
        )

Vorteil: Alle 22 dynamischen Actions sofort als Agent-Tools verfügbar.

Stufe 2 -- Schrittweises Refactoring:

  • External-Actions (SharePoint, Outlook, Jira) → hinter ProviderConnector + ServiceAdapter migrieren → generische externalBrowse/Download/Upload/Search Tools (siehe Abschnitt 8.3)
  • Domain-Actions (Trustee, Context) → bleiben als spezialisierte Tools, werden direkt in der toolRegistry registriert statt via Adapter

Stufe 3 -- AI-Actions ablösen:

  • ai.process, ai.generateDocument, ai.generateCode etc. werden NICHT als Tools exponiert
  • Diese Fähigkeiten sind im Agent selbst (der Agent IST die AI) -- er nutzt direkt extractContent, generateDocument, readSection etc.
  • subStructureFilling, subStructureGeneration, subAiCallLooping entfallen

Kein kompletter Rewrite nötig: Die bestehenden Methods/Actions bleiben initial funktionsfähig (Chatplayground, Automation nutzen sie weiter). Der Agent nutzt sie via Adapter. Schrittweise Migration ohne Breaking Changes.


11. Background Workflows

11.1 Persistente Workflow-Queue

Background-Workflows dürfen nicht auf asyncio.create_task() basieren (Server-Restart = Tasks verloren). Stattdessen eine DB-basierte Queue auf bestehendem PostgreSQL:

class BackgroundJob(BaseModel):
    """Persistierter Background-Job in PostgreSQL."""
    id: str
    workflowId: str
    userId: str
    featureInstanceId: str
    status: str                      # queued, running, completed, failed, cancelled
    description: str
    subtasks: List[Dict[str, Any]]
    maxCostCHF: Optional[float]      # Billing-Cap
    maxConcurrency: int = 1
    retryCount: int = 0
    maxRetries: int = 3
    createdAt: float
    startedAt: Optional[float]
    completedAt: Optional[float]
    error: Optional[str]
    progress: float = 0.0
    resultSummary: Optional[str]

Worker-Prozess: Ein Background-Worker pollt die Queue und führt Jobs aus:

async def _processBackgroundQueue():
    while True:
        job = await backgroundQueue.dequeueNext()
        if not job:
            await asyncio.sleep(2)
            continue
        
        try:
            await backgroundQueue.updateStatus(job.id, "running")
            async for event in runAgent(job.subtasks, job.workflowId, ...):
                await backgroundQueue.updateProgress(job.id, event)
                
                # Billing-Cap prüfen
                if job.maxCostCHF and await billingService.getWorkflowCost(job.workflowId) > job.maxCostCHF:
                    await backgroundQueue.updateStatus(job.id, "cancelled", error="budgetExceeded")
                    await _notifyUser(job.userId, f"Background-Job gestoppt: Budget überschritten")
                    break
            
            await backgroundQueue.updateStatus(job.id, "completed")
        except Exception as e:
            if job.retryCount < job.maxRetries:
                await backgroundQueue.requeueWithRetry(job.id)
            else:
                await backgroundQueue.updateStatus(job.id, "failed", error=str(e))
                await _notifyUser(job.userId, f"Background-Job fehlgeschlagen: {e}")

11.2 Tools

# Agent-Tool: Background-Job starten
async def startBackground(description, subtasks, maxCostCHF=None):
    job = await backgroundQueue.enqueue(BackgroundJob(
        workflowId=workflowId, description=description,
        subtasks=subtasks, maxCostCHF=maxCostCHF, ...
    ))
    return f"Background-Job {job.id} gestartet."

# Agent-Tool: Job-Status abfragen
async def checkBackgroundStatus(jobId):
    job = await backgroundQueue.getJob(jobId)
    return {"status": job.status, "progress": job.progress, "result": job.resultSummary}

11.3 Komplexe Ausgaben

Eine Agent-Antwort kann mehrere Ausgabe-Typen kombinieren:

  • Text (Markdown) mit Token-Streaming
  • Generierte/bearbeitete Files als Attachments
  • Gestartete Hintergrund-Jobs mit Progress-Tracking
  • Tabellen, Charts
  • Voice-Ausgabe
  • Externe Aktionen (Mail, SharePoint-Upload)

12. Observability und Monitoring

12.1 Agent-Tracing

Jeder Agent-Workflow wird vollständig protokolliert. Ein AgentTrace aggregiert alle Rounds, Tool-Calls und Kosten:

class AgentTrace(BaseModel):
    """Vollständiges Protokoll eines Agent-Workflows."""
    workflowId: str
    userId: str
    featureInstanceId: str
    startedAt: float
    completedAt: Optional[float]
    status: str                      # running, completed, maxRoundsReached, budgetExceeded, error
    totalRounds: int
    totalToolCalls: int
    totalCostCHF: float
    abortReason: Optional[str]
    rounds: List[AgentRoundLog]

class AgentRoundLog(BaseModel):
    """Log eines einzelnen Agent-Rounds."""
    roundNumber: int
    aiModel: str                     # Welches Modell wurde genutzt
    inputTokens: int
    outputTokens: int
    costCHF: float
    toolCalls: List[ToolCallLog]     # Welche Tools mit welchen Args/Results
    durationMs: int

class ToolCallLog(BaseModel):
    """Log eines einzelnen Tool-Calls."""
    toolName: str
    args: Dict[str, Any]
    success: bool
    durationMs: int
    error: Optional[str]

12.2 Kosten-Transparenz

Die bestehende BillingTransaction erfasst einzelne AI-Calls. Zusätzlich wird pro Workflow eine Kosten-Aggregation geführt:

Posten Quelle Tracking
Agent-Rounds (AI-Calls) serviceAi.callAi()billingCallback Automatisch via bestehende Billing-Integration
Entity Cache (Opt-in) _extractEntities()serviceAi.callAi() Eigener description-Typ in BillingTransaction
Progressive Summaries Summary-Calls → serviceAi.callAi() Eigener description-Typ in BillingTransaction
Embeddings embeddingService.embed() Neuer Billing-Posten

12.3 Monitoring

Metriken pro FeatureInstance / Mandate:

  • Agent-Workflows pro Zeitraum (Anzahl, Durchschnittsdauer, Kosten)
  • Tool-Nutzung (welche Tools, Erfolgsrate, Durchschnittsdauer)
  • Budget-Auslastung (Kosten vs. maxCostCHF)
  • Knowledge Store (Anzahl Files, Chunks, Embedding-Grösse)
  • Abbruchgründe (maxRounds, Budget, Fehler)

13. Umsetzungsplan

graph LR
  P1["Phase 1\nAgent Core\n+ ActionToolAdapter"]
  P2["Phase 2\nKnowledge Store"]
  P3["Phase 3\nSmart Documents\n+ Container Handling"]
  P4["Phase 4\nDocument Tools"]
  P5["Phase 5\nFile Management"]
  P6["Phase 6\nData Containers\n+ ProviderConnectors"]
  P7["Phase 7\nUnified UI"]
  P8["Phase 8\nFeature Migration\n+ Action Refactoring"]
  P9["Phase 9\nBackground Workflows"]
  
  P1 --> P2
  P1 --> P5
  P2 --> P3
  P3 --> P4
  P5 --> P6
  P4 --> P8
  P6 --> P8
  P7 --> P8
  P8 --> P9
Phase Inhalt Abhängigkeit
1 Agent Core: Loop, ToolRegistry, ConversationManager, AiCallRequest erweitern. ActionToolAdapter: bestehende 22 dynamicMode-Actions automatisch als Agent-Tools wrappen. serviceAgent als IMPORTABLE_SERVICE registrieren Grundlage
2 Knowledge Store: pgvector, 3-Ebenen, Auto-Index bei Upload, RAG-Retrieval Phase 1
3 Smart Documents: Pre-Scan, On-Demand Extraction, Progressive Summarization. Container Handling: ContainerExtractor (ZIP/TAR), FolderExtractor, rekursive Extraktion, Lazy Loading Phase 2
4 Document Tools: extractContent, readSection, browseContainer, extractContainerItem mit Knowledge-Store-Integration Phase 3
5 File Management: Tags, Folders, File Tools. Container-Referenzen im Prompt (@archiv.zip, @ordner/) Phase 1 (parallel zu 2-4)
6 Data Containers: DataSource, ProviderConnector-Architektur (1 Provider : n Services), ConnectorResolver, generische externalBrowse/Download/Upload/Search Tools. MsftConnector, GoogleConnector, FtpConnector Phase 5
7 Unified UI: Codeeditor-Basis, Streaming, Voice, Panels Parallel zu 2-6
8 Feature Integration: Chatbot + Codeeditor + Playground migrieren. Action Refactoring: External-Actions → ProviderConnector + ServiceAdapter, Domain-Actions → native Tools, AI-Actions entfallen (Agent ist die AI) Phase 4 + 6 + 7
9 Background Workflows: DB-basierte Queue (BackgroundJob), Worker-Prozess, Retry, Billing-Cap, Observability (AgentTrace, Monitoring) Phase 8

14. Was bleibt, was geht

Komponente Status
serviceAi.callAi() + Modell-Fallback + Billing Bleibt (Low-Level)
serviceExtraction.extractContent() Bleibt + erweitert (Pre-Scan, On-Demand, Container Handling)
aicoreModelSelector + aicoreModelRegistry Bleibt
serviceStreaming / EventManager Bleibt (zentral für UI)
UserConnections + OAuth Bleibt + erweitert (DataSource, ProviderConnector 1:n)
interfaceVoiceObjects (STT/TTS) Bleibt (Unified UI)
SharePoint Graph API Service Bleibt → wird zu SharepointAdapter hinter MsftConnector
PostgreSQL Bleibt + erweitert (pgvector, FileContentIndex, ContentChunk, WorkflowMemory)
Workflow Methods/Actions (36 Actions) Phase 1: ActionToolAdapter-Wrapping → Phase 8: schrittweise Migration
MethodSharepoint (9 Actions) Migriert → MsftConnectorSharepointAdapter + generische Connection Tools
MethodOutlook (4 Actions) Migriert → MsftConnectorOutlookAdapter + generische Connection Tools
MethodAi (7 Actions: process, generate, etc.) Abgelöst (Agent ist die AI, nutzt Document Tools direkt)
MethodJira (8 Actions) Migriert → JiraConnector + JiraAdapter
MethodTrustee, MethodContext Bleiben als spezialisierte Agent-Tools
actionExecutor + methodDiscovery Bleibt während Migration (Adapter), später optional
subStructureFilling (Pipeline) Abgelöst (Agent + Document Tools + Knowledge Store)
subStructureGeneration Abgelöst
subAiCallLooping (JSON Repair) Abgelöst (Agent iteriert natürlich)
trim_messages(strategy="last") Ersetzt durch Progressive Summarization
Codeeditor codeEditorProcessor Migriert zu serviceAgent
Codeeditor UI (SSE, Events) Basis für Unified UI
Chatbot LangGraph + DatabaseCheckpointer Abgelöst (serviceAgent + Knowledge Store)
Chatplayground Abgelöst (Unified UI)
datamodelFiles.py FileItem Erweitert (tags, folders, description)
BinaryExtractor (ZIP Fallback) Ersetzt durch ContainerExtractor (rekursive Extraktion)