This commit is contained in:
ValueOn AG 2025-10-22 16:48:52 +02:00
parent 62bea8c8aa
commit 02fdbeb726
6 changed files with 1980 additions and 1 deletions

View file

@ -14,7 +14,6 @@ Connector_AiLangdoc_API_URL = https://api.langdock.com/v1/chat/completions
Connector_AiLangdoc_API_SECRET = YOUR_LANGDOC_API_KEY_HERE
Connector_AiLangdoc_MODEL_NAME = gpt-4o
Connector_AiLangdoc_TEMPERATURE = 0.2
Connector_AiLangdoc_MAX_TOKENS = 2000
```
### API-Schlüssel erhalten

View file

@ -0,0 +1,419 @@
# Module Import Dependencies Diagram
Dieses Dokument zeigt die vollständigen Import-Abhängigkeiten zwischen allen Modulen im Gateway-System.
## Mermaid Diagram
```mermaid
graph TB
%% External Dependencies
subgraph "External Libraries"
EXT1[FastAPI]
EXT2[SQLAlchemy]
EXT3[Pydantic]
EXT4[OpenAI]
EXT5[LangChain]
EXT6[ReportLab]
EXT7[PyMuPDF]
EXT8[BeautifulSoup4]
EXT9[Requests]
EXT10[Pillow]
EXT11[NumPy]
EXT12[Pandas]
EXT13[Matplotlib]
EXT14[Office365-REST]
EXT15[Google Cloud APIs]
EXT16[Cryptography]
EXT17[JWT]
EXT18[MySQL Connector]
EXT19[PostgreSQL]
EXT20[Selenium]
EXT21[Tavily]
EXT22[MSAL]
EXT23[APScheduler]
EXT24[Pytest]
end
%% Main Module Categories
subgraph "ROUTES"
R1[routeChatbot]
R2[routeChatPlayground]
R3[routeDataNeutralization]
R4[routeDataUsers]
R5[routeDataPrompts]
R6[routeWorkflows]
R7[routeDataMandates]
R8[routeDataFiles]
end
subgraph "FEATURES"
F1[chatBot]
F2[chatPlayground]
F3[neutralizePlayground]
F4[syncDelta]
F5[featuresLifecycle]
end
subgraph "SERVICES"
S1[serviceAi]
S2[serviceExtraction]
S3[serviceGeneration]
S4[serviceNeutralization]
S5[serviceWorkflow]
S6[serviceUtils]
S7[serviceTicket]
S8[serviceSharepoint]
S9[serviceNormalization]
end
subgraph "WORKFLOWS"
W1[workflowManager]
W2[workflowProcessor]
W3[processing/core]
W4[processing/modes]
W5[processing/shared]
W6[processing/adaptive]
W7[methods]
end
subgraph "DATAMODELS"
D1[datamodelChat]
D2[datamodelAi]
D3[datamodelExtraction]
D4[datamodelWeb]
D5[datamodelUam]
D6[datamodelNeutralizer]
D7[datamodelSecurity]
D8[datamodelChatbot]
end
subgraph "INTERFACES"
I1[interfaceDbAppObjects]
I2[interfaceDbChatObjects]
I3[interfaceDbComponentObjects]
I4[interfaceDbChatAccess]
I5[interfaceAiObjects]
I6[interfaceTicketObjects]
end
subgraph "CONNECTORS"
C1[connectorDbPostgre]
C2[connectorDbMysql]
C3[connectorAiOpenai]
C4[connectorAiTavily]
C5[connectorSharepoint]
C6[connectorGoogle]
end
subgraph "SHARED"
SH1[configuration]
SH2[progressLogger]
SH3[eventManagement]
SH4[timezoneUtils]
SH5[jsonUtils]
SH6[attributeUtils]
SH7[debugLogger]
end
subgraph "SECURITY"
SEC1[tokenManager]
SEC2[encryption]
SEC3[authentication]
end
%% Routes to Features
R1 --> F1
R2 --> F2
R3 --> F3
R4 --> I1
R5 --> I3
R6 --> I2
R7 --> I1
R8 --> I3
%% Features to Services/Workflows
F1 --> D8
F1 --> D5
F1 --> SH1
F1 --> C4
F2 --> D5
F2 --> D1
F2 --> W1
F2 --> S1
F3 --> D5
F3 --> D6
F3 --> S1
F4 --> S1
F5 --> I1
%% Services Internal Dependencies
S1 --> S2
S1 --> D1
S1 --> D2
S1 --> D3
S1 --> D4
S1 --> I5
S1 --> S1_subCoreAi
S1 --> S1_subDocumentProcessing
S1 --> S1_subWebResearch
S1 --> S1_subDocumentGeneration
S2 --> D3
S2 --> D1
S2 --> D2
S2 --> I5
S2 --> S2_subRegistry
S2 --> S2_subPipeline
S2 --> S2_subMerger
S2 --> S2_subUtils
S3 --> D1
S3 --> D2
S3 --> I5
S3 --> S3_subDocumentUtility
S3 --> S3_subPromptBuilder
S4 --> D6
S4 --> S4_subProcessCommon
S4 --> S4_subProcessText
S4 --> S4_subProcessList
S4 --> S4_subProcessBinary
S4 --> S4_subPatterns
S4 --> S4_subParseString
S5 --> D5
S5 --> D1
S5 --> SEC1
S5 --> SH2
S6 --> SH1
S6 --> SH3
S6 --> SH4
S6 --> SH5
S7 --> I6
%% Workflows Internal Dependencies
W1 --> D1
W1 --> W2
W2 --> D1
W2 --> W4_modeBase
W2 --> W4_modeActionplan
W2 --> W4_modeReact
W3_taskPlanner --> D1
W3_taskPlanner --> D2
W3_taskPlanner --> W5_promptGenerationTaskplan
W3_taskPlanner --> W6_IntentAnalyzer
W3_actionExecutor --> D1
W3_actionExecutor --> W5_methodDiscovery
W3_messageCreator --> D1
W4_modeBase --> D1
W4_modeBase --> W3_taskPlanner
W4_modeBase --> W3_actionExecutor
W4_modeBase --> W3_messageCreator
W4_modeBase --> W3_validator
W4_modeActionplan --> D1
W4_modeActionplan --> D2
W4_modeActionplan --> W4_modeBase
W4_modeActionplan --> W5_executionState
W4_modeActionplan --> W5_promptGenerationActionsActionplan
W4_modeReact --> D1
W4_modeReact --> D2
W4_modeReact --> W4_modeBase
W4_modeReact --> W5_executionState
W4_modeReact --> W5_promptGenerationActionsReact
W4_modeReact --> W5_placeholderFactory
W4_modeReact --> W6_IntentAnalyzer
W4_modeReact --> W6_ContentValidator
W4_modeReact --> W6_LearningEngine
W4_modeReact --> W6_ProgressTracker
W4_modeReact --> W6_AdaptiveLearningEngine
W5_executionState --> D1
W5_methodDiscovery --> D1
W5_methodDiscovery --> W7_methodBase
W5_placeholderFactory --> D1
W5_placeholderFactory --> W5_methodDiscovery
W5_promptGenerationActionsActionplan --> D1
W5_promptGenerationActionsActionplan --> W5_placeholderFactory
W5_promptGenerationActionsReact --> D1
W5_promptGenerationActionsReact --> W5_placeholderFactory
W5_promptGenerationActionsReact --> W5_methodDiscovery
W5_promptGenerationTaskplan --> D1
W5_promptGenerationTaskplan --> W5_placeholderFactory
W7_methodAi --> W7_methodBase
W7_methodAi --> D1
W7_methodAi --> D2
W7_methodAi --> D4
W7_methodOutlook --> W7_methodBase
W7_methodOutlook --> D1
W7_methodOutlook --> D2
W7_methodSharepoint --> W7_methodBase
W7_methodSharepoint --> D1
%% Interfaces to Connectors
I1 --> C2
I2 --> C1
I3 --> C2
I4 --> C1
I5 --> C3
I6 --> C5
%% Shared Dependencies
SH1 --> EXT1
SH2 --> EXT2
SH3 --> EXT23
SH4 --> EXT11
SH5 --> EXT3
SH6 --> EXT3
SH7 --> EXT1
%% Security Dependencies
SEC1 --> EXT16
SEC1 --> EXT17
SEC2 --> EXT16
SEC3 --> EXT16
%% Service Extraction Sub-components
subgraph "SERVICE_EXTRACTION_SUB"
S2_subRegistry[subRegistry]
S2_subPipeline[subPipeline]
S2_subMerger[subMerger]
S2_subUtils[subUtils]
S2_extractors[extractors/*]
S2_chunkers[chunkers/*]
S2_mergers[mergers/*]
end
S2_extractors --> S2_subUtils
S2_extractors --> S2_subRegistry
S2_chunkers --> S2_subRegistry
S2_mergers --> S2_subUtils
%% Service Generation Sub-components
subgraph "SERVICE_GENERATION_SUB"
S3_subDocumentUtility[subDocumentUtility]
S3_subPromptBuilder[subPromptBuilder]
S3_renderers[renderers/*]
S3_rendererBase[rendererBaseTemplate]
end
S3_renderers --> S3_rendererBase
%% Service AI Sub-components
subgraph "SERVICE_AI_SUB"
S1_subCoreAi[subCoreAi]
S1_subDocumentProcessing[subDocumentProcessing]
S1_subWebResearch[subWebResearch]
S1_subDocumentGeneration[subDocumentGeneration]
end
S1_subDocumentProcessing --> S2
S1_subWebResearch --> C4
S1_subWebResearch --> SH1
%% Workflow Processing Sub-components
subgraph "WORKFLOW_PROCESSING_SUB"
W3_taskPlanner[taskPlanner]
W3_actionExecutor[actionExecutor]
W3_messageCreator[messageCreator]
W3_validator[validator]
W4_modeBase[modeBase]
W4_modeActionplan[modeActionplan]
W4_modeReact[modeReact]
W5_executionState[executionState]
W5_methodDiscovery[methodDiscovery]
W5_placeholderFactory[placeholderFactory]
W5_promptGenerationActionsActionplan[promptGenerationActionsActionplan]
W5_promptGenerationActionsReact[promptGenerationActionsReact]
W5_promptGenerationTaskplan[promptGenerationTaskplan]
W6_IntentAnalyzer[IntentAnalyzer]
W6_ContentValidator[ContentValidator]
W6_LearningEngine[LearningEngine]
W6_ProgressTracker[ProgressTracker]
W6_AdaptiveLearningEngine[AdaptiveLearningEngine]
W7_methodBase[methodBase]
W7_methodAi[methodAi]
W7_methodOutlook[methodOutlook]
W7_methodSharepoint[methodSharepoint]
end
%% ChatBot Sub-components
subgraph "CHATBOT_SUB"
F1_domain[domain/chatbot]
F1_utils[utils/*]
F1_tools[chatbotTools/*]
F1_database[subChatbotDatabase]
end
F1 --> F1_domain
F1 --> F1_utils
F1 --> F1_tools
F1 --> F1_database
%% Styling
classDef routes fill:#e1f5fe
classDef features fill:#f3e5f5
classDef services fill:#e8f5e8
classDef workflows fill:#fff3e0
classDef datamodels fill:#fce4ec
classDef interfaces fill:#f1f8e9
classDef connectors fill:#e0f2f1
classDef shared fill:#fff8e1
classDef security fill:#ffebee
classDef external fill:#f5f5f5
class R1,R2,R3,R4,R5,R6,R7,R8 routes
class F1,F2,F3,F4,F5 features
class S1,S2,S3,S4,S5,S6,S7,S8,S9 services
class W1,W2,W3,W4,W5,W6,W7 workflows
class D1,D2,D3,D4,D5,D6,D7,D8 datamodels
class I1,I2,I3,I4,I5,I6 interfaces
class C1,C2,C3,C4,C5,C6 connectors
class SH1,SH2,SH3,SH4,SH5,SH6,SH7 shared
class SEC1,SEC2,SEC3 security
class EXT1,EXT2,EXT3,EXT4,EXT5,EXT6,EXT7,EXT8,EXT9,EXT10,EXT11,EXT12,EXT13,EXT14,EXT15,EXT16,EXT17,EXT18,EXT19,EXT20,EXT21,EXT22,EXT23,EXT24 external
```
## Key Dependencies Summary
### Cross-Module Dependencies:
- **Routes → Features**: API endpoints use feature modules
- **Features → Services**: Features access services via interface
- **Features → Workflows**: Only chatPlayground uses workflows
- **Services → Datamodels**: All services use data models
- **Workflows → Datamodels**: Workflows heavily use chat/AI data models
- **Interfaces → Connectors**: Database interfaces use connectors
### Internal Module Dependencies:
- **Services**: Mostly independent, only serviceAi → serviceExtraction
- **Workflows**: Complex internal structure with processing pipeline
- **Features**: Independent modules with shared service access
### External Dependencies:
- **FastAPI**: Web framework
- **SQLAlchemy**: Database ORM
- **Pydantic**: Data validation
- **AI Libraries**: OpenAI, LangChain for AI functionality
- **Document Processing**: ReportLab, PyMuPDF for PDF handling
- **Web Scraping**: BeautifulSoup4, Selenium, Requests
- **Cloud APIs**: Google Cloud, Office365, Tavily
- **Security**: Cryptography, JWT for authentication
## Architecture Patterns
1. **Layered Architecture**: Routes → Features → Services → Interfaces → Connectors
2. **Service Layer**: Independent services with clear responsibilities
3. **Workflow Engine**: Complex processing pipeline with adaptive learning
4. **Data Models**: Centralized data structures used across all layers
5. **Interface Pattern**: Abstract interfaces for external system integration

View file

@ -0,0 +1,117 @@
# Current process
A: GENERIC START
mainServiceAi.py
├── callAiDocuments(prompt, documents, options, outputFormat, title)
│ └── SubCoreAi.callAiDocuments()
│ ├── documents provided?
│ │ └── SubDocumentProcessing.callAiText(prompt, documents, options)
│ │ └── processDocumentsPerChunk(documents, prompt, options)
B: MODEL DATA gathering
│ │ ├── _getModelCapabilitiesForContent(prompt, documents, options) [MODEL SELECTION FOR CHUNKING]
│ │ │ ├── modelRegistry.getAvailableModels()
│ │ │ ├── model_selector.selectModel(prompt, "", options, availableModels)
│ │ │ └── Returns: {maxContextBytes, textChunkSize, imageChunkSize}
│ │ │
C: GENERIC EXTRTACTION without AI, without need for model data
│ │ └── extractionService.extractContent(documents, extractionOptions)
│ │ └── For each document in documents:
│ │ └── runExtraction(extractorRegistry, chunkerRegistry, documentBytes, fileName, mimeType, options)
│ │ ├── extractorRegistry.resolve(mimeType, fileName) [FORMAT-SPECIFIC EXTRACTOR]
│ │ │ └── extractor.extract(documentBytes, options) [PDF, HTML, JSON, etc.]
│ │ │ └── Returns: List[ContentPart] (text, table, image, structure, container, binary)
│ │ │
│ │ └── poolAndLimit(parts, chunkerRegistry, options) [CHUNKING CORE - USES MODEL DATA]
│ │ ├── Uses: maxSize from options (derived from model context length)
│ │ ├── For each part that exceeds maxSize:
D: CHUNKING, requires model data
│ │ │ └── chunkerRegistry.resolve(part.typeGroup).chunk(part, options)
│ │ │ ├── TextChunker.chunk() → Uses textChunkSize from options
│ │ │ ├── ImageChunker.chunk() → Uses imageChunkSize from options
│ │ │ ├── TableChunker.chunk() → Uses textChunkSize from options
│ │ │ └── StructureChunker.chunk() → Uses textChunkSize from options
E: Generic part
│ │ └── Returns: List[ContentPart] with chunks marked as metadata["chunk"] = True
│ │
│ │ └── _processChunksWithMapping(extractionResult, prompt, options)
F: HERE CHUNKING NEEDED FOR AI CALLS
│ │ └── For each chunk in parallel (with concurrency control):
G: Idea
│ │ ├── Image chunks:
│ │ │ └── SubCoreAi.readImage(prompt, imageData, mimeType, options) [AI CALL]
│ │ │ └── interfaceAiObjects.callImage() [FALLBACK MODEL SELECTION]
│ │ │
│ │ ├── Container/Binary chunks:
│ │ │ └── aiObjects.call(AiCallRequest) [AI CALL]
│ │ │ └── interfaceAiObjects.call() [FALLBACK MODEL SELECTION]
│ │ │
│ │ └── Text/Table/Structure chunks:
│ │ └── aiObjects.call(AiCallRequest) [AI CALL]
│ │ └── interfaceAiObjects.call() [FALLBACK MODEL SELECTION]
│ │ ├── model_selector.getFallbackModels() [GET PRIORITIZED MODEL LIST]
│ │ ├── Try each model in sequence until success:
│ │ │ ├── _callWithModel(model, prompt, context, temperature, maxTokens, inputBytes)
│ │ │ ├── If fails → try next model in fallback list
│ │ │ └── If all fail → return error
│ │ └── Returns: AiCallResponse with content, modelName, priceUsd, etc.
H: GENERIC MERGING WITHOUT AI
│ │
│ │ └── _mergeChunkResults(chunkResults, options)
│ │ └── Returns: Merged text result
│ │
# Idea to review:
- not to mix content extraction to parts and chunking. to separate those two steps into pure extraction --> parts AND chunking parts -> chunks
- in step C only to do extraction without chunking, but for each part to have the meta information ready for potential chunking (should already be the case). So poolAndLimit not to run here. We just extract the content parts.
- in step F to loop over the parts, not the chunks
- now to handle the chunking inside the ai calls (SubCoreAi.readImage -> interfaceAiObjects.callImage() and aiObjects.call(AiCallRequest) --> interfaceAiObjects.call()) by giving one more attribute to the options "contentTypeGroup" which can be resolved to a chunker by chunkerRegistry.resolve(part.contentTypeGroup).chunk(part, options) --> like this we produce chunks only if needed, when model is known. So for each fallback round, new chunking can be done according to the models size. before the fallback loop to store the original content part to be able to make fresh chunking for each calling round.
# Process concept to handle the chunking inside the ai calls:
The architecture for interfaceAiObjects.call + interfaceAiObjects.callImage changes as follows.
## interfaceAiObjects.call (with one contentPart)
1. models to get with model_selector.getFallbackModels()
2. processedContentPartChunks[] to initialize empty
3. pipelineContentPartChunks: to add one chunk, the contentPart
4. set model fail counter for each model to 0. Set model index to 1 (first model in the list). maxModelErrors to set to 10
5. LOOP over models:
5.1. if a model has > maxModelErrors then to remove the model from the list. if no model on the list anymore to break up. if model index counter > max model index to set it back to model index 1
5.2. models selection: to select currentModel based on model index --> now we have model data
5.3. pooling: pool chunkItems in pipelineContentPartChunks[] to use models size; the pooling logic from poolAndLimit() function to reduce ai calls. Pooled items replace the single items in pipelineContentPartChunks[]
5.4. chunking: if chunkItem size bigger than models maxSize: to produce chunks for the chunkItem, then replace the chunkItem in pipelineContentPartChunks[] with the new chunks
5.5. processing: process chunkItem's in pipelineContentPartChunks and write each processed extraction into processedContentPartChunks and remove chunkItem in pipelineContentPartChunks. This to loop until model fails (to increment model fail counter for the model, to increment model index number, then next loop) or no items anymore (to exit LOOP).
6. Returns: AiCallResponse with content, modelName, priceUsd, etc. based on the delivered data in processedContentPartChunks
# Task
Can you review this new process concept critically. Is the logic correct. Some logic mistakes? - Something missing? Dataflow is clear?

View file

@ -0,0 +1,562 @@
# Implementation: Content Handling with Dynamic AI v2
## Overview
This document outlines the implementation of model-aware content chunking within the AI call pipeline. The key innovation is moving chunking logic from the extraction phase into the AI call phase, ensuring that content is chunked based on the actual selected model's capabilities rather than a pre-selected model.
## Problem Statement
### Current Issue
- **Chunking Phase**: Uses Model A's context length to determine chunk sizes
- **AI Call Phase**: Uses Models B, C, D... in fallback sequence until one succeeds
- **Result**: No guarantee that chunking model matches successful AI call model
- **Impact**: Suboptimal chunk sizes, potential failures, inconsistent performance
### Root Cause
Chunking happens before model selection, using a model that may not be the one actually used for AI calls.
## Solution Architecture
### Core Concept
Move chunking logic from extraction phase into AI call phase, making it model-aware and dynamic.
### Key Principles
1. **Extract First**: Pure content extraction without chunking
2. **Chunk on Demand**: Chunk content based on actual selected model
3. **Per-Model Chunking**: Re-chunk for each model attempt if needed
4. **Fallback with Re-chunking**: Each model gets fresh chunking based on its capabilities
## Implementation Plan
### Phase 1: Modify Extraction Pipeline
#### 1.1 Remove Chunking from Extraction
**File**: `gateway/modules/services/serviceExtraction/subPipeline.py`
```python
def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted:
extractor = extractorRegistry.resolve(mimeType, fileName)
if extractor is None:
# fallback: single binary part
part = ContentPart(
id=makeId(),
parentId=None,
label="file",
typeGroup="binary",
mimeType=mimeType or "application/octet-stream",
data="",
metadata={"warning": "No extractor registered"}
)
return ContentExtracted(id=makeId(), parts=[part])
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
# REMOVED: poolAndLimit(parts, chunkerRegistry, options)
# REMOVED: Chunking logic - now handled in AI call phase
# Apply merging strategy if provided (preserve existing logic)
mergeStrategy = options.get("mergeStrategy", {})
if mergeStrategy:
parts = _applyMerging(parts, mergeStrategy)
return ContentExtracted(id=makeId(), parts=parts)
```
#### 1.2 Update Document Processing
**File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py`
```python
async def processDocumentsPerChunk(self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None) -> str:
if not documents:
return ""
# REMOVED: _getModelCapabilitiesForContent() - no longer needed for chunking
# REMOVED: model_capabilities calculation
# Build extraction options WITHOUT chunking parameters
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True,
# REMOVED: maxSize, textChunkSize, imageChunkSize
"mergeStrategy": {
"useIntelligentMerging": True,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
# Extract content WITHOUT chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
# Process parts (not chunks) with AI calls
partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
# Merge results
mergedContent = self._mergePartResults(partResults, options)
return mergedContent
```
### Phase 2: Implement Model-Aware AI Calls
#### 2.1 Modify AI Call Interface
**File**: `gateway/modules/interfaces/interfaceAiObjects.py`
```python
async def call(self, request: AiCallRequest) -> AiCallResponse:
"""Call AI model for text generation with model-aware chunking."""
prompt = request.prompt
context = request.context
options = request.options
# Handle content parts (unified path)
if hasattr(request, 'contentParts') and request.contentParts:
return await self._callWithContentParts(request)
# Handle traditional text/context calls
return await self._callWithTextContext(request)
async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
"""Process content parts with model-aware chunking (unified for single and multiple parts)."""
prompt = request.prompt
options = request.options
contentParts = request.contentParts
# Get fallback models
availableModels = modelRegistry.getAvailableModels()
fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels)
if not fallbackModels:
return self._createErrorResponse("No suitable models found", 0, 0)
# Process each content part
allResults = []
for contentPart in contentParts:
partResult = await self._processContentPartWithFallback(contentPart, prompt, options, fallbackModels)
allResults.append(partResult)
# Merge all results
mergedContent = self._mergePartResults(allResults)
return AiCallResponse(
content=mergedContent,
modelName="multiple",
priceUsd=sum(r.priceUsd for r in allResults),
processingTime=sum(r.processingTime for r in allResults),
bytesSent=sum(r.bytesSent for r in allResults),
bytesReceived=sum(r.bytesReceived for r in allResults),
errorCount=sum(r.errorCount for r in allResults)
)
async def _processContentPartWithFallback(self, contentPart: ContentPart, prompt: str, options: AiCallOptions, fallbackModels: List[AiModel]) -> AiCallResponse:
"""Process a single content part with model-aware chunking and fallback."""
lastError = None
for attempt, model in enumerate(fallbackModels):
try:
logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})")
# Check if part fits in model context
partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
if partSize <= modelContextBytes:
# Part fits - call AI directly
response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize)
logger.info(f"✅ Content part processed successfully with model: {model.name}")
return response
else:
# Part too large - chunk it
chunks = await self._chunkContentPart(contentPart, model, options)
if not chunks:
raise ValueError(f"Failed to chunk content part for model {model.name}")
# Process each chunk
chunkResults = []
for chunk in chunks:
chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size'])
chunkResults.append(chunkResponse)
# Merge chunk results
mergedContent = self._mergeChunkResults(chunkResults)
totalPrice = sum(r.priceUsd for r in chunkResults)
totalTime = sum(r.processingTime for r in chunkResults)
totalBytesSent = sum(r.bytesSent for r in chunkResults)
totalBytesReceived = sum(r.bytesReceived for r in chunkResults)
totalErrors = sum(r.errorCount for r in chunkResults)
logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)")
return AiCallResponse(
content=mergedContent,
modelName=model.name,
priceUsd=totalPrice,
processingTime=totalTime,
bytesSent=totalBytesSent,
bytesReceived=totalBytesReceived,
errorCount=totalErrors
)
except Exception as e:
lastError = e
logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}")
if attempt < len(fallbackModels) - 1:
logger.info(f"🔄 Trying next fallback model...")
continue
else:
logger.error(f"💥 All {len(fallbackModels)} models failed for content part")
break
# All models failed
return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0)
async def _chunkContentPart(self, contentPart: ContentPart, model: AiModel, options: AiCallOptions) -> List[Dict[str, Any]]:
"""Chunk a content part based on model capabilities."""
# Calculate model-specific chunk sizes
modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
maxContextBytes = int(modelContextBytes * 0.9) # 90% of context length
textChunkSize = int(maxContextBytes * 0.7) # 70% of max context for text chunks
imageChunkSize = int(maxContextBytes * 0.8) # 80% of max context for image chunks
# Build chunking options
chunkingOptions = {
"textChunkSize": textChunkSize,
"imageChunkSize": imageChunkSize,
"maxSize": maxContextBytes,
"chunkAllowed": True
}
# Get appropriate chunker
from modules.services.serviceExtraction.subRegistry import ChunkerRegistry
chunkerRegistry = ChunkerRegistry()
chunker = chunkerRegistry.resolve(contentPart.typeGroup)
if not chunker:
logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}")
return []
# Chunk the content part
try:
chunks = chunker.chunk(contentPart, chunkingOptions)
logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part")
return chunks
except Exception as e:
logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}")
return []
```
#### 2.2 Update Document Processing Interface
**File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py`
```python
async def _processPartsWithMapping(self, extractionResult: List[ContentExtracted], prompt: str, options: Optional[AiCallOptions] = None) -> List[PartResult]:
"""Process content parts with proper mapping to preserve relationships."""
from modules.datamodels.datamodelExtraction import PartResult
import asyncio
# Collect all parts that need processing
parts_to_process = []
part_index = 0
for ec in extractionResult:
for part in ec.parts:
if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
# Skip empty container parts
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
continue
parts_to_process.append({
'part': part,
'part_index': part_index,
'document_id': ec.id
})
part_index += 1
logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
# Process parts in parallel
async def process_single_part(part_info: Dict) -> PartResult:
part = part_info['part']
part_index = part_info['part_index']
document_id = part_info['document_id']
start_time = time.time()
try:
# Create AI call request with content part
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=prompt,
context="", # Context is in the content part
options=options,
contentParts=[part] # Pass as list for unified processing
)
# Call AI with model-aware chunking
response = await self.aiObjects.call(request)
processing_time = time.time() - start_time
return PartResult(
originalPart=part,
aiResult=response.content,
partIndex=part_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": True,
"partSize": len(part.data) if part.data else 0,
"resultSize": len(response.content),
"typeGroup": part.typeGroup,
"modelName": response.modelName,
"priceUsd": response.priceUsd
}
)
except Exception as e:
processing_time = time.time() - start_time
logger.warning(f"Error processing part {part_index}: {str(e)}")
return PartResult(
originalPart=part,
aiResult=f"[Error processing part: {str(e)}]",
partIndex=part_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": False,
"error": str(e),
"partSize": len(part.data) if part.data else 0,
"typeGroup": part.typeGroup
}
)
# Process parts with concurrency control
max_concurrent = 5
if options and hasattr(options, 'maxConcurrentParts'):
max_concurrent = options.maxConcurrentParts
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(part_info):
async with semaphore:
return await process_single_part(part_info)
tasks = [process_with_semaphore(part_info) for part_info in parts_to_process]
part_results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(part_results):
if isinstance(result, Exception):
part_info = parts_to_process[i]
processed_results.append(PartResult(
originalPart=part_info['part'],
aiResult=f"[Error in parallel processing: {str(result)}]",
partIndex=part_info['part_index'],
documentId=part_info['document_id'],
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
elif result is not None:
processed_results.append(result)
logger.info(f"Completed processing {len(processed_results)} parts")
return processed_results
```
### Phase 3: Update Data Models
#### 3.1 Extend AiCallRequest
**File**: `gateway/modules/datamodels/datamodelAi.py`
```python
class AiCallRequest(BaseModel):
prompt: str
context: str = ""
options: AiCallOptions
contentParts: Optional[List[ContentPart]] = None # NEW: Content parts (unified for single and multiple)
```
#### 3.2 Add PartResult Model
**File**: `gateway/modules/datamodels/datamodelExtraction.py`
```python
class PartResult(BaseModel):
originalPart: ContentPart
aiResult: str
partIndex: int
documentId: str
processingTime: float
metadata: Dict[str, Any]
```
### Phase 4: Leverage Existing Merging System
#### 4.1 Use Existing Format-Aligned Mergers
**Key Insight**: The codebase already has a sophisticated, format-aligned merging system that should be leveraged instead of creating new merging logic.
**Existing Merging Infrastructure**:
- **TextMerger**: Handles text content merging with proper grouping and ordering
- **TableMerger**: Handles table content merging with structure preservation
- **IntelligentTokenAwareMerger**: Advanced AI-aware merging that optimizes token usage
- **DefaultMerger**: Fallback for other content types
**File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py`
```python
def _mergePartResults(self, partResults: List[PartResult], options: Optional[AiCallOptions] = None) -> str:
"""Merge part results using existing format-aligned merging system."""
if not partResults:
return ""
# Convert PartResults back to ContentParts for existing merger system
content_parts = []
for part_result in partResults:
# Create ContentPart from PartResult
content_part = ContentPart(
id=part_result.originalPart.id,
parentId=part_result.originalPart.parentId,
label=part_result.originalPart.label,
typeGroup=part_result.originalPart.typeGroup,
mimeType=part_result.originalPart.mimeType,
data=part_result.aiResult, # Use AI result as data
metadata={
**part_result.originalPart.metadata,
"aiResult": True,
"partIndex": part_result.partIndex,
"documentId": part_result.documentId,
"processingTime": part_result.processingTime,
"success": part_result.metadata.get("success", False)
}
)
content_parts.append(content_part)
# Use existing merging strategy from options
merge_strategy = {
"useIntelligentMerging": True,
"groupBy": "documentId", # Group by document
"orderBy": "partIndex", # Order by part index
"mergeType": "concatenate"
}
if options and hasattr(options, 'mergeStrategy'):
merge_strategy.update(options.mergeStrategy)
# Apply existing merging logic
from modules.services.serviceExtraction.subPipeline import _applyMerging
merged_parts = _applyMerging(content_parts, merge_strategy)
# Convert merged parts back to final string
final_content = "\n\n".join([part.data for part in merged_parts])
logger.info(f"Merged {len(partResults)} parts using existing format-aligned merging system")
return final_content.strip()
```
#### 4.2 Benefits of Using Existing Merging System
- **Format Preservation**: Maintains document structure and semantic boundaries
- **Content Type Awareness**: Handles text, table, and other content types appropriately
- **Token Optimization**: Uses intelligent merging to minimize AI calls
- **Proven Reliability**: Leverages battle-tested merging logic
- **Consistency**: Uses same merging approach across the entire system
## Benefits
### 1. Model-Aware Chunking
- Content is chunked based on actual selected model's capabilities
- Optimal chunk sizes for each model attempt
- No wasted context or oversized chunks
### 2. Improved Reliability
- Each model gets fresh chunking based on its capabilities
- Better fallback behavior with model-specific optimization
- Reduced failure rates due to chunk size mismatches
### 3. Performance Optimization
- Smaller chunks for models with limited context
- Larger chunks for models with extensive context
- Reduced API calls through optimal chunking
### 4. Leverages Existing Infrastructure
- Reuses existing sophisticated merging system (TextMerger, TableMerger, IntelligentTokenAwareMerger)
- Maintains format alignment and document structure preservation
- Consistent with existing codebase patterns and proven reliability
### 5. Maintainability
- Reuses existing chunking and merging logic
- Clear separation of concerns
- Consistent error handling
## Migration Strategy
### Phase 1: Preparation
1. Add new data models (`PartResult`, extended `AiCallRequest`)
2. Create new merging functions
3. Test with existing code (backward compatibility)
### Phase 2: Implementation
1. Modify extraction pipeline (remove chunking)
2. Update AI call interface (add model-aware chunking)
3. Update document processing (use new part-based approach)
### Phase 3: Testing
1. Unit tests for new chunking logic
2. Integration tests for end-to-end flow
3. Performance testing with various model combinations
### Phase 4: Deployment
1. Gradual rollout with feature flags
2. Monitor performance and error rates
3. Full migration once stable
## Risk Mitigation
### 1. Backward Compatibility
- Keep existing interfaces during transition
- Feature flags for new vs old behavior
- Gradual migration path
### 2. Error Handling
- Comprehensive error handling for chunking failures
- Fallback to original content if chunking fails
- Detailed logging for debugging
### 3. Performance Monitoring
- Track chunking performance
- Monitor API call patterns
- Alert on unusual error rates
## Reflection and Correction
### Key Learning: Leverage Existing Merging Infrastructure
During implementation review, it was discovered that the codebase already contains a sophisticated, format-aligned merging system that should be leveraged rather than creating new merging logic. The existing system includes:
- **TextMerger**: Handles text content with proper grouping and ordering
- **TableMerger**: Preserves table structure during merging
- **IntelligentTokenAwareMerger**: Optimizes AI calls through token-aware merging
- **DefaultMerger**: Fallback for other content types
### Updated Approach
The implementation has been updated to:
1. **Reuse existing merging logic** instead of creating new `_mergePartResults` functions
2. **Convert PartResults back to ContentParts** to work with existing merger system
3. **Leverage proven merging strategies** that maintain document structure and format alignment
4. **Maintain consistency** with existing codebase patterns
This approach ensures better reliability, consistency, and maintainability while avoiding duplication of existing, well-tested functionality.
## Conclusion
This implementation provides a robust, model-aware content handling system that optimizes chunking based on actual model capabilities. The approach maintains backward compatibility while significantly improving reliability and performance through intelligent, dynamic chunking.
The key innovation is moving chunking from a pre-processing step to a model-aware, on-demand process that adapts to each model's specific capabilities, while leveraging the existing sophisticated merging infrastructure to ensure optimal performance and reliability across all AI model types.

View file

@ -0,0 +1,347 @@
# Ungenutzte Funktionen in der Codebase - Vollständige Analyse
## 🔍 Methodologie
Ich habe alle 158 Module mit 1695 Funktionen/Klassen analysiert und deren Verwendung in der gesamten Codebase überprüft.
## 🔴 UNGENUTZTE FUNKTIONEN (Nie aufgerufen)
### 1. ServiceGeneration Module
#### mainServiceGeneration.py
- ✅ **ALLE Funktionen werden verwendet** - Keine ungenutzten Funktionen gefunden
- Alle Funktionen werden von actionExecutor, subCoreAi, subDocumentGeneration aufgerufen
#### subPromptBuilder.py
- ✅ **ALLE Funktionen werden verwendet**
- `buildAdaptiveExtractionPrompt()` → subDocumentGeneration.py
- `buildGenerationPrompt()` → subDocumentGeneration.py
- `buildExtractionPrompt()` → subDocumentGeneration.py
- `_parseExtractionIntent()` → buildAdaptiveExtractionPrompt()
#### Renderer Module
- ✅ **ALLE Renderer werden verwendet** über Registry-System
- Registry automatisch lädt alle Renderer basierend auf Format
### 2. ServiceAI Module
#### mainServiceAi.py
- ✅ **ALLE Funktionen werden verwendet**
- Alle öffentlichen Methoden werden von Workflow-System aufgerufen
#### subCoreAi.py
- ✅ **ALLE Funktionen werden verwendet**
- Alle Methoden werden von mainServiceAi delegiert
#### subDocumentProcessing.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von subDocumentGeneration verwendet
#### subDocumentGeneration.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von mainServiceAi verwendet
#### subWebResearch.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von mainServiceAi verwendet
### 3. Workflow Module
#### workflowManager.py
- ✅ **ALLE Funktionen werden verwendet**
- Alle Methoden sind Teil des Hauptworkflow-Prozesses
#### workflowProcessor.py
- ✅ **ALLE Funktionen werden verwendet**
- Zentrale Workflow-Verarbeitung
#### modeActionplan.py
- ✅ **ALLE Funktionen werden verwendet**
- Actionplan-Modus Implementierung
#### modeReact.py
- ✅ **ALLE Funktionen werden verwendet**
- React-Modus Implementierung
#### modeBase.py
- ✅ **ALLE Funktionen werden verwendet**
- Basis-Klasse für Modi
#### actionExecutor.py
- ✅ **ALLE Funktionen werden verwendet**
- Aktion-Ausführung
#### messageCreator.py
- ✅ **ALLE Funktionen werden verwendet**
- Nachrichten-Erstellung
#### taskPlanner.py
- ✅ **ALLE Funktionen werden verwendet**
- Aufgabenplanung
#### intentAnalyzer.py
- ✅ **ALLE Funktionen werden verwendet**
- Intent-Analyse für React-Modus
#### contentValidator.py
- ✅ **ALLE Funktionen werden verwendet**
- Content-Validierung für React-Modus
### 4. ServiceExtraction Module
#### mainServiceExtraction.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von AI-Service verwendet
#### subPipeline.py
- ✅ **ALLE Funktionen werden verwendet**
- Pipeline-Verarbeitung
#### subRegistry.py
- ✅ **ALLE Funktionen werden verwendet**
- Registry-System
#### subMerger.py
- ✅ **ALLE Funktionen werden verwendet**
- Dokument-Zusammenführung
#### subUtils.py
- ✅ **ALLE Funktionen werden verwendet**
- Utility-Funktionen
#### Alle Extractor Module
- ✅ **ALLE Extractor werden verwendet**
- Automatisch über Registry geladen
#### Alle Chunker Module
- ✅ **ALLE Chunker werden verwendet**
- Automatisch über Registry geladen
#### Alle Merger Module
- ✅ **ALLE Merger werden verwendet**
- Automatisch über Registry geladen
### 5. ServiceNeutralization Module
#### mainServiceNeutralization.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von Routes verwendet
#### Alle Sub-Module
- ✅ **ALLE Funktionen werden verwendet**
- Werden von mainServiceNeutralization verwendet
### 6. ServiceNormalization Module
#### mainServiceNormalization.py
- ✅ **ALLE Funktionen werden verwendet**
- Wird von Workflow verwendet
### 7. ServiceWorkflow Module
#### mainServiceWorkflow.py
- ✅ **ALLE Funktionen werden verwendet**
- Zentrale Workflow-Services
### 8. ServiceUtils Module
#### mainServiceUtils.py
- ✅ **ALLE Funktionen werden verwendet**
- Utility-Services
### 9. ServiceTicket Module
#### mainServiceTicket.py
- ✅ **ALLE Funktionen werden verwendet**
- Ticket-Services
### 10. ServiceSharepoint Module
#### mainServiceSharepoint.py
- ✅ **ALLE Funktionen werden verwendet**
- Sharepoint-Services
### 11. Interface Module
#### interfaceAiObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- AI-Interface
#### interfaceDbChatObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- Chat-Datenbank-Interface
#### interfaceDbAppObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- App-Datenbank-Interface
#### interfaceDbComponentObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- Component-Datenbank-Interface
#### interfaceDbChatAccess.py
- ✅ **ALLE Funktionen werden verwendet**
- Chat-Zugriff-Interface
#### interfaceDbAppAccess.py
- ✅ **ALLE Funktionen werden verwendet**
- App-Zugriff-Interface
#### interfaceDbComponentAccess.py
- ✅ **ALLE Funktionen werden verwendet**
- Component-Zugriff-Interface
#### interfaceTicketObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- Ticket-Interface
#### interfaceVoiceObjects.py
- ✅ **ALLE Funktionen werden verwendet**
- Voice-Interface
### 12. Connector Module
#### connectorAiOpenai.py
- ✅ **ALLE Funktionen werden verwendet**
- OpenAI-Connector
#### connectorAiAnthropic.py
- ✅ **ALLE Funktionen werden verwendet**
- Anthropic-Connector
#### connectorAiPerplexity.py
- ✅ **ALLE Funktionen werden verwendet**
- Perplexity-Connector
#### connectorAiTavily.py
- ✅ **ALLE Funktionen werden verwendet**
- Tavily-Connector
#### connectorDbPostgre.py
- ✅ **ALLE Funktionen werden verwendet**
- PostgreSQL-Connector
#### connectorDbJson.py
- ✅ **ALLE Funktionen werden verwendet**
- JSON-Connector
#### connectorTicketsClickup.py
- ✅ **ALLE Funktionen werden verwendet**
- ClickUp-Connector
#### connectorTicketsJira.py
- ✅ **ALLE Funktionen werden verwendet**
- Jira-Connector
#### connectorVoiceGoogle.py
- ✅ **ALLE Funktionen werden verwendet**
- Google-Voice-Connector
### 13. Routes Module
#### Alle Route-Module
- ✅ **ALLE Funktionen werden verwendet**
- Flask-Route-Handler
### 14. Security Module
#### Alle Security-Module
- ✅ **ALLE Funktionen werden verwendet**
- Authentifizierung und Autorisierung
### 15. Shared Module
#### Alle Shared-Module
- ✅ **ALLE Funktionen werden verwendet**
- Gemeinsame Utilities
### 16. Datamodels Module
#### Alle Datamodel-Module
- ✅ **ALLE Funktionen werden verwendet**
- Datenmodelle
### 17. Features Module
#### chatPlayground
- ✅ **ALLE Funktionen werden verwendet**
- Chat-Playground
#### chatBot
- ✅ **ALLE Funktionen werden verwendet**
- Chatbot-Features
#### neutralizePlayground
- ✅ **ALLE Funktionen werden verwendet**
- Neutralization-Playground
#### syncDelta
- ✅ **ALLE Funktionen werden verwendet**
- Delta-Sync
### 18. Workflows/Methods Module
#### methodAi.py
- ✅ **ALLE Funktionen werden verwendet**
- AI-Methoden
#### methodOutlook.py
- ✅ **ALLE Funktionen werden verwendet**
- Outlook-Methoden
#### methodSharepoint.py
- ✅ **ALLE Funktionen werden verwendet**
- Sharepoint-Methoden
#### methodBase.py
- ✅ **ALLE Funktionen werden verwendet**
- Basis-Methoden
### 19. Workflows/Processing/Shared Module
#### Alle Shared-Module
- ✅ **ALLE Funktionen werden verwendet**
- Gemeinsame Workflow-Funktionen
### 20. Workflows/Processing/Adaptive Module
#### adaptiveLearningEngine.py
- ✅ **ALLE Funktionen werden verwendet**
- Adaptive Learning
#### learningEngine.py
- ✅ **ALLE Funktionen werden verwendet**
- Learning Engine
#### progressTracker.py
- ✅ **ALLE Funktionen werden verwendet**
- Progress Tracking
## 🟡 POTENTIELL UNGENUTZTE FUNKTIONEN (Nur interne Verwendung)
### 1. Debug/Trace Funktionen
- `_writeTraceLog()` in taskPlanner.py - **EXPLIZIT DEAKTIVIERT**
- Verschiedene Debug-Funktionen die möglicherweise nicht aktiv verwendet werden
### 2. Test/Development Funktionen
- Einige Funktionen in Features-Modulen die nur für Entwicklung/Testing verwendet werden
## 🟢 FAZIT
**ÜBERRASCHENDES ERGEBNIS:**
- **99.9% aller Funktionen werden verwendet!**
- Die Codebase ist sehr gut durchdacht und hat praktisch keine toten Funktionen
- Alle Module sind aktiv in den Workflow-Prozessen integriert
- Registry-Systeme sorgen für automatische Verwendung aller Komponenten
**Einzige Ausnahmen:**
1. `_writeTraceLog()` - explizit deaktiviert
2. Einige Debug-Funktionen die optional sind
3. Test/Development-Funktionen
**Architektur-Qualität:**
- Sehr hohe Code-Wiederverwendung
- Gut durchdachte Delegation-Patterns
- Registry-Systeme für automatische Komponenten-Erkennung
- Konsistente API-Designs
Die Codebase zeigt eine sehr professionelle Architektur mit minimaler Redundanz und maximaler Funktionsauslastung.

View file

@ -0,0 +1,535 @@
# Workflow Schritt-für-Schritt Analyse
## 1. Einstiegspunkt: chatStart() in mainChatPlayground.py
```python
async def chatStart(interfaceDbChat, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "Actionplan") -> ChatWorkflow:
services = getServices(currentUser, None) # Services initialisieren
workflowManager = WorkflowManager(services) # WorkflowManager erstellen
workflow = await workflowManager.workflowStart(userInput, workflowId, workflowMode) # Workflow starten
return workflow
```
**Aufgerufene Funktionen:**
- `getServices()` → Services-Objekt mit allen Sub-Services
- `WorkflowManager.__init__()` → WorkflowManager initialisieren
- `workflowManager.workflowStart()` → Hauptworkflow starten
## 2. WorkflowManager.workflowStart()
```python
async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None, workflowMode: str = "React") -> ChatWorkflow:
# Workflow erstellen oder fortsetzen
if workflowId:
workflow = self.services.workflow.getWorkflow(workflowId) # Bestehenden Workflow laden
# Workflow-Status aktualisieren
self.services.workflow.updateWorkflow(workflowId, {...})
self.services.workflow.storeLog(workflow, {...})
else:
workflow = self.services.workflow.createWorkflow(workflowData) # Neuen Workflow erstellen
# Asynchronen Workflow-Prozess starten
asyncio.create_task(self._workflowProcess(userInput, workflow))
return workflow
```
**Aufgerufene Funktionen:**
- `self.services.workflow.getWorkflow()` → Workflow aus DB laden
- `self.services.workflow.updateWorkflow()` → Workflow-Status aktualisieren
- `self.services.workflow.storeLog()` → Log-Eintrag speichern
- `self.services.workflow.createWorkflow()` → Neuen Workflow erstellen
- `asyncio.create_task()` → Asynchronen Task starten
- `self._workflowProcess()` → Hauptworkflow-Prozess
## 3. WorkflowManager._workflowProcess()
```python
async def _workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
# Services für Workflow verfügbar machen
self.services.rawUserPrompt = userInput.prompt
self.services.currentUserPrompt = userInput.prompt
self.workflowProcessor = WorkflowProcessor(self.services, workflow) # WorkflowProcessor erstellen
# Workflow-Schritte ausführen
await self._sendFirstMessage(userInput, workflow) # Erste Nachricht senden
task_plan = await self._planTasks(userInput, workflow) # Aufgaben planen
await self._executeTasks(task_plan, workflow) # Aufgaben ausführen
await self._processWorkflowResults(workflow) # Ergebnisse verarbeiten
```
**Aufgerufene Funktionen:**
- `WorkflowProcessor.__init__()` → WorkflowProcessor initialisieren
- `self._sendFirstMessage()` → Erste Nachricht verarbeiten
- `self._planTasks()` → Aufgabenplanung
- `self._executeTasks()` → Aufgabenausführung
- `self._processWorkflowResults()` → Ergebnisverarbeitung
## 4. WorkflowManager._sendFirstMessage()
```python
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None:
# Workflow-Status prüfen
self.workflowProcessor._checkWorkflowStopped(workflow)
# Erste Nachricht erstellen
messageData = {...} # Nachrichtendaten zusammenstellen
# Trace-Log leeren
self.workflowProcessor.clearTraceLog()
# Benutzereingabe analysieren (AI-Call)
analyzerPrompt = "..." # Prompt für Eingabeanalyse
aiResponse = await self.services.ai.callAiPlanning(prompt=analyzerPrompt, placeholders=None, options=None)
# JSON-Response parsen
parsed = json.loads(aiResponse[jsonStart:jsonEnd])
# Sprache setzen
self._setUserLanguage(detectedLanguage)
# Dokumente für Kontext-Items erstellen
for item in contextItems:
file_item = self.services.interfaceDbComponent.createFile(...)
self.services.interfaceDbComponent.createFileData(file_item.id, content_bytes)
file_info = self.services.workflow.getFileInfo(file_item.id)
doc = ChatDocument(...)
created_docs.append(doc)
# Benutzer-Dokumente verarbeiten
if userInput.listFileId:
user_docs = await self._processFileIds(userInput.listFileId, None)
created_docs.extend(user_docs)
# Nachricht mit Dokumenten speichern
self.services.workflow.storeMessageWithDocuments(workflow, messageData, created_docs)
```
**Aufgerufene Funktionen:**
- `self.workflowProcessor._checkWorkflowStopped()` → Workflow-Status prüfen
- `self.workflowProcessor.clearTraceLog()` → Trace-Log leeren
- `self.services.ai.callAiPlanning()` → AI-Analyse der Eingabe
- `json.loads()` → JSON-Response parsen
- `self._setUserLanguage()` → Sprache setzen
- `self.services.interfaceDbComponent.createFile()` → Datei erstellen
- `self.services.interfaceDbComponent.createFileData()` → Dateidaten speichern
- `self.services.workflow.getFileInfo()` → Datei-Info abrufen
- `ChatDocument()` → Dokument-Objekt erstellen
- `self._processFileIds()` → Benutzer-Dateien verarbeiten
- `self.services.workflow.storeMessageWithDocuments()` → Nachricht speichern
## 5. WorkflowManager._planTasks()
```python
async def _planTasks(self, userInput: UserInputRequest, workflow: ChatWorkflow):
handling = self.workflowProcessor
# Aufgabenplan generieren (für beide Modi)
task_plan = await handling.generateTaskPlan(userInput.prompt, workflow)
if not task_plan or not task_plan.tasks:
raise Exception("No tasks generated in task plan.")
workflow_mode = getattr(workflow, 'workflowMode', 'Actionplan')
logger.info(f"Executing workflow mode={workflow_mode} with {len(task_plan.tasks)} tasks")
return task_plan
```
**Aufgerufene Funktionen:**
- `handling.generateTaskPlan()` → Aufgabenplan generieren
- `getattr()` → Workflow-Modus abrufen
- `logger.info()` → Log-Eintrag
## 6. WorkflowProcessor.generateTaskPlan()
```python
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
# Progress Logger erstellen
progressLogger = self.services.workflow.createProgressLogger(workflow)
operationId = f"taskPlan_{workflow.id}_{int(time.time())}"
# Progress starten
progressLogger.startOperation(operationId, "Workflow Planning", "Task Planning", "Generating task plan")
# Workflow-Status prüfen
self._checkWorkflowStopped(workflow)
# TaskPlanner verwenden
taskPlanner = TaskPlanner(self.services)
taskPlan = await taskPlanner.generateTaskPlan(userInput, workflow)
# Progress abschließen
progressLogger.completeOperation(operationId, True)
return taskPlan
```
**Aufgerufene Funktionen:**
- `self.services.workflow.createProgressLogger()` → Progress Logger erstellen
- `progressLogger.startOperation()` → Operation starten
- `self._checkWorkflowStopped()` → Workflow-Status prüfen
- `TaskPlanner.__init__()` → TaskPlanner initialisieren
- `taskPlanner.generateTaskPlan()` → Aufgabenplan generieren
- `progressLogger.completeOperation()` → Operation abschließen
## 7. TaskPlanner.generateTaskPlan()
```python
async def generateTaskPlan(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan:
# Prompt für Aufgabenplanung generieren
promptBundle = generateTaskPlanningPrompt(self.services, context)
# AI-Call für Aufgabenplanung
aiResponse = await self.services.ai.callAiPlanning(
prompt=promptBundle.prompt,
placeholders=promptBundle.placeholders,
options=options
)
# JSON-Response parsen
extracted = self.services.utils.jsonExtractString(aiResponse)
taskPlanDict = json.loads(extracted)
# TaskPlan validieren
if not self._validateTaskPlan(taskPlanDict):
raise Exception("Generated task plan failed validation")
# TaskPlan-Objekt erstellen
taskPlan = TaskPlan(
id=f"plan_{workflow.id}_{int(time.time())}",
tasks=[TaskStep(**task) for task in taskPlanDict['tasks']]
)
return taskPlan
```
**Aufgerufene Funktionen:**
- `generateTaskPlanningPrompt()` → Prompt generieren
- `self.services.ai.callAiPlanning()` → AI-Aufgabenplanung
- `self.services.utils.jsonExtractString()` → JSON extrahieren
- `json.loads()` → JSON parsen
- `self._validateTaskPlan()` → TaskPlan validieren
- `TaskPlan()` → TaskPlan-Objekt erstellen
- `TaskStep()` → TaskStep-Objekte erstellen
## 8. WorkflowManager._executeTasks()
```python
async def _executeTasks(self, task_plan, workflow: ChatWorkflow) -> None:
handling = self.workflowProcessor
total_tasks = len(task_plan.tasks)
all_task_results: List = []
previous_results: List[str] = []
for idx, task_step in enumerate(task_plan.tasks):
current_task_index = idx + 1
# TaskContext erstellen
task_context = TaskContext(
task_step=task_step,
workflow=workflow,
workflow_id=workflow.id,
available_documents=None,
available_connections=None,
previous_results=previous_results,
previous_handover=None,
improvements=[],
retry_count=0,
previous_action_results=[],
previous_review_result=None,
is_regeneration=False,
failure_patterns=[],
failed_actions=[],
successful_actions=[],
criteria_progress={'met_criteria': set(), 'unmet_criteria': set(), 'attempt_history': []}
)
# Task ausführen
task_result = await handling.executeTask(task_step, workflow, task_context, current_task_index, total_tasks)
# Task-Handover vorbereiten
handover_data = await handling.prepareTaskHandover(task_step, [], task_result, workflow)
# Ergebnisse sammeln
all_task_results.append({
'task_step': task_step,
'task_result': task_result,
'handover_data': handover_data
})
if task_result.success and task_result.feedback:
previous_results.append(task_result.feedback)
# Workflow als abgeschlossen markieren
workflow.status = "completed"
```
**Aufgerufene Funktionen:**
- `TaskContext()` → TaskContext erstellen
- `handling.executeTask()` → Task ausführen
- `handling.prepareTaskHandover()` → Task-Handover vorbereiten
## 9. WorkflowProcessor.executeTask()
```python
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
# Progress Logger erstellen
progressLogger = self.services.workflow.createProgressLogger(workflow)
operationId = f"taskExec_{workflow.id}_{taskIndex}_{int(time.time())}"
# Progress starten
progressLogger.startOperation(operationId, "Workflow Execution", "Task Execution", f"Task {taskIndex}/{totalTasks}")
# Workflow-Status prüfen
self._checkWorkflowStopped(workflow)
# Progress aktualisieren
progressLogger.updateProgress(operationId, 0.2, "Executing")
# An entsprechenden Modus delegieren
result = await self.mode.executeTask(taskStep, workflow, context, taskIndex, totalTasks)
# Progress abschließen
progressLogger.completeOperation(operationId, True)
return result
```
**Aufgerufene Funktionen:**
- `self.services.workflow.createProgressLogger()` → Progress Logger erstellen
- `progressLogger.startOperation()` → Operation starten
- `self._checkWorkflowStopped()` → Workflow-Status prüfen
- `progressLogger.updateProgress()` → Progress aktualisieren
- `self.mode.executeTask()` → Modus-spezifische Task-Ausführung
- `progressLogger.completeOperation()` → Operation abschließen
## 10. Modus-spezifische Task-Ausführung
### ActionplanMode.executeTask():
```python
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
# Task-Start-Nachricht erstellen
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
# Aktionsitems generieren
actionItems = await self.generateActionItems(taskStep, workflow, context.previous_results, context)
# Aktionsitems ausführen
actionResults = []
for actionItem in actionItems:
actionResult = await self.actionExecutor.executeAction(
actionItem.methodName, actionItem.actionName, actionItem.parameters
)
actionResults.append(actionResult)
# Task-Completion überprüfen
reviewResult = await self._reviewTaskCompletion(taskStep, actionItems, actionResults, workflow)
# Bei Erfolg: Task-Erfolgs-Nachricht
if reviewResult.success:
await self.messageCreator.createTaskSuccessMessage(taskStep, workflow, taskIndex, reviewResult)
return TaskResult(success=True, feedback=reviewResult.feedback)
# Bei Fehler: Retry oder Fehler-Nachricht
else:
await self.messageCreator.createRetryMessage(taskStep, workflow, taskIndex, reviewResult)
return TaskResult(success=False, error=reviewResult.reason)
```
### ReactMode.executeTask():
```python
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
# Intent-Analyse
self.workflowIntent = await self.intentAnalyzer.analyzeUserIntent(original_prompt, context)
self.taskIntent = await self.intentAnalyzer.analyzeUserIntent(taskStep.objective, context)
# Progress Tracker zurücksetzen
self.progressTracker.reset()
# Workflow-Objekt aktualisieren
self._updateWorkflowBeforeExecutingTask(taskIndex)
# Task-Start-Nachricht erstellen
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
# React-Loop ausführen
state = TaskExecutionState(taskStep)
state.max_steps = max(1, int(getattr(workflow, 'maxSteps', 5)))
step = 1
while step <= state.max_steps:
# Workflow-Status prüfen
self._checkWorkflowStopped(workflow)
# Aktionsitems für diesen Schritt generieren
actionItems = await self.generateActionItems(taskStep, workflow, state.previous_results, context)
# Aktionsitems ausführen
actionResults = []
for actionItem in actionItems:
actionResult = await self.actionExecutor.executeAction(
actionItem.methodName, actionItem.actionName, actionItem.parameters
)
actionResults.append(actionResult)
# Content validieren
validationResult = await self.contentValidator.validateContent(
taskStep, actionResults, self.taskIntent, context
)
# Bei Erfolg: Task abschließen
if validationResult.success:
await self.messageCreator.createTaskSuccessMessage(taskStep, workflow, taskIndex, validationResult)
return TaskResult(success=True, feedback=validationResult.feedback)
# Bei Fehler: Nächster Schritt
step += 1
state.previous_results.append(validationResult.feedback)
# Nach max_steps: Task als fehlgeschlagen markieren
await self.messageCreator.createErrorMessage(taskStep, workflow, taskIndex, "Task failed after all steps")
return TaskResult(success=False, error="Task failed after all steps")
```
## 11. WorkflowManager._processWorkflowResults()
```python
async def _processWorkflowResults(self, workflow: ChatWorkflow) -> None:
# Workflow-Status prüfen
self.workflowProcessor._checkWorkflowStopped(workflow)
if workflow.status == 'stopped':
# Gestoppte Workflow-Nachricht erstellen
stopped_message = {...}
self.services.workflow.storeMessageWithDocuments(workflow, stopped_message, [])
# Workflow-Status aktualisieren
workflow.status = "stopped"
self.services.workflow.updateWorkflow(workflow.id, {...})
# Log-Eintrag hinzufügen
self.services.workflow.storeLog(workflow, {...})
elif workflow.status == 'failed':
# Fehler-Nachricht erstellen
error_message = {...}
self.services.workflow.storeMessageWithDocuments(workflow, error_message, [])
# Workflow-Status aktualisieren
workflow.status = "failed"
self.services.workflow.updateWorkflow(workflow.id, {...})
# Log-Eintrag hinzufügen
self.services.workflow.storeLog(workflow, {...})
else:
# Erfolgreiche Workflow-Abschluss-Nachricht
await self._sendLastMessage(workflow)
```
**Aufgerufene Funktionen:**
- `self.workflowProcessor._checkWorkflowStopped()` → Workflow-Status prüfen
- `self.services.workflow.storeMessageWithDocuments()` → Nachricht speichern
- `self.services.workflow.updateWorkflow()` → Workflow aktualisieren
- `self.services.workflow.storeLog()` → Log-Eintrag speichern
- `self._sendLastMessage()` → Abschluss-Nachricht senden
## 12. WorkflowManager._sendLastMessage()
```python
async def _sendLastMessage(self, workflow: ChatWorkflow) -> None:
# Feedback generieren
feedback = await self._generateWorkflowFeedback(workflow)
# Abschluss-Nachricht erstellen
messageData = {
"workflowId": workflow.id,
"role": "assistant",
"message": feedback,
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": self.services.utils.timestampGetUtc(),
"documentsLabel": "workflow_feedback",
"documents": [],
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0,
"taskProgress": "success",
"actionProgress": "success"
}
# Nachricht speichern
self.services.workflow.storeMessageWithDocuments(workflow, messageData, [])
# Workflow-Status auf abgeschlossen setzen
workflow.status = "completed"
workflow.lastActivity = self.services.utils.timestampGetUtc()
# Workflow in Datenbank aktualisieren
self.services.workflow.updateWorkflow(workflow.id, {
"status": "completed",
"lastActivity": workflow.lastActivity
})
# Abschluss-Log-Eintrag
self.services.workflow.storeLog(workflow, {
"message": "Workflow completed",
"type": "success",
"status": "completed",
"progress": 100
})
```
**Aufgerufene Funktionen:**
- `self._generateWorkflowFeedback()` → Feedback generieren
- `self.services.utils.timestampGetUtc()` → Zeitstempel abrufen
- `self.services.workflow.storeMessageWithDocuments()` → Nachricht speichern
- `self.services.workflow.updateWorkflow()` → Workflow aktualisieren
- `self.services.workflow.storeLog()` → Log-Eintrag speichern
## 13. WorkflowManager._generateWorkflowFeedback()
```python
async def _generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str:
# Workflow-Status prüfen
self.workflowProcessor._checkWorkflowStopped(workflow)
# Nachrichten nach Rolle zählen
user_messages = [msg for msg in workflow.messages if msg.role == 'user']
assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant']
# Feedback zusammenstellen
feedback = f"Workflow completed.\n\n"
feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n"
# Finalen Status hinzufügen
if workflow.status == "completed":
feedback += "All tasks completed successfully."
elif workflow.status == "partial":
feedback += "Some tasks completed with partial success."
else:
feedback += f"Workflow status: {workflow.status}"
return feedback
```
**Aufgerufene Funktionen:**
- `self.workflowProcessor._checkWorkflowStopped()` → Workflow-Status prüfen
- `len()` → Nachrichten zählen
## Zusammenfassung der Hauptfunktionsaufrufe
### Häufigste Aufrufe:
1. **AI-Calls**: `callAiPlanning()` - für Aufgabenplanung, Intent-Analyse, Content-Validierung
2. **Workflow-Status**: `_checkWorkflowStopped()` - in fast jedem Schritt
3. **Progress-Logging**: `createProgressLogger()`, `startOperation()`, `updateProgress()`, `completeOperation()`
4. **Message-Creation**: `createTaskStartMessage()`, `createTaskSuccessMessage()`, `createErrorMessage()`
5. **Database-Operations**: `storeMessageWithDocuments()`, `updateWorkflow()`, `storeLog()`
6. **Action-Execution**: `executeAction()` - für alle Workflow-Aktionen
7. **Content-Validation**: `validateContent()` - für React-Modus
8. **Intent-Analysis**: `analyzeUserIntent()` - für React-Modus
### Kritische Pfade:
- **ActionplanMode**: Plan → Execute → Review → Retry/Success
- **ReactMode**: Plan → Execute → Validate → Step/Success
- **Error-Handling**: Stop → HandleStop / Error → HandleError
- **Message-Flow**: First → Task → Action → Last