This commit is contained in:
ValueOn AG 2025-12-03 23:02:58 +01:00
parent e7d4843d54
commit e4225a88ea
14 changed files with 6781 additions and 0 deletions

View file

@ -0,0 +1 @@
<mxfile host="Electron" modified="2025-12-02T13:13:53.592Z" agent="5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/20.3.0 Chrome/104.0.5112.114 Electron/20.1.3 Safari/537.36" etag="Qk9c24H_WZZPEQdUvbO1" version="20.3.0" type="device"><diagram name="Module Dependencies" id="module-dependencies">7ZzZcuI4FIafhnvLC8tldxJ6uipT1dVc9LViH4M7xqKECGGefuRYwrJlHJrxoilxlfjIC/yftv8YaeI9bN+/Ubzb/E0iSCeuE71PvMeJ67oL3+V/8sipiCAXzYrImiaRiJWBVfIPiKAjoockgn3lREZIypJdNRiSLIOQVWKYUnKsnhaTtPrUHV6DFliFONWjv5KIbUTUd5yy4C9I1hv5aEeWbLE8WwT2GxyRoxLynibeAyWEFf9t3x8gzeWTwhTXLS+Unj8ZhYxdcwF/PoWouOoNpweohpfiU7KT/O6UHLII8qvRxPt63CQMVjsc5qVHjpvHNmybiuI4SdMHkhL6ca0HKA7imMf3jJJXUEocFMwWL+cSqaqb34NkbCWej+RxUSWQz4/1byxEeAPK4F0JCQW+AdkCoyd+iiidSjyiQi7k8VGhK4ltFLBTEcOiQq3Pty4l5/8I1ZsJRJjhbd5G9hoFtahbErEHQRw0kfAx8uehKSRmwYAkRHdBqE5CLeqYRMxZOE0kYBrwjmMsEm6NxHTINoGTkFDQKMiwHQTKUWMMBHugb0kIelMoCzoeHOZxAIsmDOglAHc0DLMaBX8+IIUkY0Bj3MRBLeq8QSxCv4lEHMxiPlcbiYRfI+EN2x7CA03YqaE9yIKuKcALQBOFlxkK0WhD9HxWozDkEB0DZgfa0BrKgo4poHje3Ct53nSBYCwK9eEZDUnhSOhrnOY2qo5BKel4dHBiN0aN1sHxI9+cQXpQEFxR1tAYZLjjphCCD2ETgvncAT8eC0HdMwwJAKI1IE1/Tfb8NKkCoWxD1iTD6VMZ/VoFU57zTMhOBH8DYyehHT4wUoWlSw9Z9CVPdvDDl5SEr0VomaTykt+H7U5+KkzDNhp7cqBhgyv9SL1gugZxuppOyL90Kz8KKWbJWzWp8l9puJbRqJtWlUad1DhEvDsRg9qHbxkNNZVgXtsI7jQMaRdTy0hU8zsqC5XROCxmdxZG9VLzOw9xcj0fNw6PxZ2HHDcqWblxaMi30nccBgzjyDY3Xu+RzBrJkW12vA1H3RiOhMQ2P96GxIz5FbLNlLchqb7iGQmIbb68DUh1uB8JiG32vBXI+LMs+xy66jMMHEHss+iXgJhh0ZF9Hv0SkPG7K9c2h16dQ5nXXbm2ufTLQMzorqT+dyBGTHdd2zx6C47xRw/b3PllGLUf/o3EwzZzXlPdrAyva5szb6FhyOTKNnPeQsSIwdw2b97GY/zR3DZjrv5G3LzOyrPNml/CYcJrD882W34Jhhmm3LPNlF/CYcKvfDzbLPllGONPqTzbLPlFGKPPp7z/rRvXlrFFnhu7+TK2CO835w9z6/sQhYxZQ0sKay6ERu1ZCSvsGLyzJqmlbhnJcoDqWkERwmmyzvhhCnF+h3xBXxLi9IsIb5Mo+mDftAqxrA/5ys62RYRuG5DrFxEGn68hdBrWEHodrCEseOhTr2WuAUdPMh5/xiegGpzBN+PpXulrd1PoTml9VvXIrVAvGt+0zU73GiP5e6bhRNZnS9+zmGL+bQ9h7nP6kfuWnVx6kPvaFcjdya3Ph1bFNK2fruOWrVp60Hk+eLXWJzvf5XjeV43+861Yulf6vF3dcErrrxVWZxfYi9A37LbSg9DB4FVaf1uwLFJR/eh8y34qPei8GLxC628Bfok8eT999C0bpnQvtOcOXqH1BP/P3D33U51v2ROlB5Wng1fnhlVvDwkNDynmCjuPsOPnQBaeNLmNMZTCQDr9EFlcWe/d7ohck72/Limi1tYr0yTVVvR5xucP0iqfpkgUjYMGiWXs6kyKeMIPkvBPouwzVCHs18kV2R9xUQlPuw9Cn9yoSBdpN/qoBedv3VQx+GG5c3BxerkDs/f0Lw==</diagram></mxfile>

View file

@ -0,0 +1,290 @@
# AI Call Flow Architecture Analysis
## Executive Summary
This document analyzes the current AI call flow in the workflow system and compares it with Claude's approach to identify weaknesses causing:
1. **Over-complication for simple requests**
2. **Slow rendering for small documents**
## Current Architecture Flow
### Complete Flow for `ai.process` Action
```
User Request
workflowProcessor.generateTaskPlan()
├─→ modeDynamic.generateTaskPlan() [AI Call #1: Task Planning]
└─→ Creates TaskPlan with TaskSteps
workflowProcessor.executeTask()
├─→ modeDynamic.executeTask()
│ ├─→ _planSelect() [AI Call #2: Action Selection]
│ │ └─→ generateDynamicPlanSelectionPrompt()
│ │ └─→ callAiPlanning() [~30s, DETAILED mode]
│ │
│ ├─→ _actExecute()
│ │ ├─→ generateDynamicParametersPrompt() [AI Call #3: Parameter Generation]
│ │ │ └─→ callAiPlanning() [~30s, DETAILED mode]
│ │ │
│ │ ├─→ actionExecutor.executeSingleAction()
│ │ │ └─→ methodAi.process()
│ │ │ ├─→ progressLogStart()
│ │ │ ├─→ getChatDocumentsFromDocumentList() [Document Loading]
│ │ │ ├─→ _analyzePromptAndCreateOptions() [AI Call #4: Prompt Analysis]
│ │ │ │ └─→ callAiPlanning() [~10s, BASIC mode]
│ │ │ │
│ │ │ ├─→ callAiDocuments()
│ │ │ │ ├─→ progressLogStart() [Nested progress tracking]
│ │ │ │ ├─→ callAiText() [if documents exist]
│ │ │ │ │ └─→ extractionService.processDocumentsPerChunk()
│ │ │ │ │ └─→ Multiple AI calls per chunk [AI Call #5-N]
│ │ │ │ │
│ │ │ │ ├─→ buildGenerationPrompt() [Complex JSON template]
│ │ │ │ ├─→ _callAiWithLooping()
│ │ │ │ │ ├─→ AI Call [AI Call #6: First iteration]
│ │ │ │ │ ├─→ Check complete_response flag
│ │ │ │ │ ├─→ Extract sections
│ │ │ │ │ ├─→ Repair broken JSON if needed
│ │ │ │ │ └─→ Loop up to 50 iterations [AI Call #6-55]
│ │ │ │ │
│ │ │ │ ├─→ Parse generated JSON
│ │ │ │ ├─→ generationService.renderReport() [RENDERING PHASE]
│ │ │ │ │ ├─→ _getFormatRenderer()
│ │ │ │ │ ├─→ renderer.render() [Format-specific rendering]
│ │ │ │ │ │ └─→ For DOCX: python-docx library calls
│ │ │ │ │ │ └─→ For PDF: ReportLab/other library
│ │ │ │ │ │ └─→ For HTML: Template rendering
│ │ │ │ │ └─→ Returns rendered bytes/base64
│ │ │ │ │
│ │ │ │ └─→ Build result dict
│ │ │ │
│ │ │ └─→ progressLogFinish()
│ │ │
│ │ └─→ progressLogFinish()
│ │
│ ├─→ _observeBuild()
│ │ └─→ Build Observation object
│ │
│ ├─→ contentValidator.validateContent() [AI Call #7: Content Validation]
│ │ └─→ Multiple validation checks
│ │
│ ├─→ _refineDecide() [AI Call #8: Refinement Decision]
│ │ ├─→ extractReviewContent()
│ │ ├─→ generateDynamicRefinementPrompt()
│ │ └─→ callAiPlanning() [~30s, ADVANCED mode]
│ │
│ └─→ Loop continues if decision = "continue"
└─→ createTaskCompletionMessage()
```
### Key Bottlenecks Identified
#### 1. **Multiple AI Calls for Simple Requests**
**Problem**: Even for a simple "generate a text file" request, the system makes:
- **AI Call #1**: Task Planning (unnecessary for simple requests)
- **AI Call #2**: Action Selection (could be deterministic)
- **AI Call #3**: Parameter Generation (overkill for simple prompts)
- **AI Call #4**: Prompt Analysis (redundant - prompt is already clear)
- **AI Call #5-N**: Document extraction per chunk (if documents exist)
- **AI Call #6-55**: Document generation with looping (up to 50 iterations!)
- **AI Call #7**: Content Validation (could be optional for simple outputs)
- **AI Call #8**: Refinement Decision (unnecessary if output is simple)
**Total**: 8-60+ AI calls for a simple request that Claude handles in 1-2 calls.
#### 2. **Complex Prompt Generation**
**Current Approach**:
- Stage 1: `generateDynamicPlanSelectionPrompt()` - Large template with many placeholders
- Stage 2: `generateDynamicParametersPrompt()` - Another large template
- Stage 3: `buildGenerationPrompt()` - Complex JSON template with sections structure
**Claude's Approach**: Direct prompt, minimal overhead.
#### 3. **Inefficient Rendering**
**Current Flow**:
```
AI generates JSON with sections
Parse JSON
Extract sections array
Get format renderer
Renderer processes sections
├─→ For DOCX: Create Document object
├─→ For each section: Add paragraph/heading
├─→ Apply formatting
├─→ Generate Table of Contents
└─→ Convert to bytes/base64
```
**Issues**:
- Rendering happens AFTER AI generation completes
- No streaming or progressive rendering
- Full document structure built even for simple text
- Complex renderers for simple formats (e.g., TXT rendered through DOCX pipeline)
#### 4. **Unnecessary Iteration Looping**
**Current**: `_callAiWithLooping()` loops up to 50 times:
- Checks for `complete_response` flag
- Repairs broken JSON
- Extracts sections incrementally
- Continues until complete
**For Simple Requests**: This is overkill. A simple text generation should be single-shot.
#### 5. **Redundant Progress Tracking**
- Nested progress tracking (method level + service level)
- Multiple progress updates for same operation
- Progress logging adds overhead
## Claude's Architecture (From Concept Documents)
### Claude's Flow
```
User Input
Input Reception & Analysis [AI Call #1: Semantic Understanding]
├─→ AI understands intent semantically (not regex/keyword matching)
├─→ Detects patterns like "write a document" → create docx
├─→ Detects "continue our conversation" → use past chats tool
├─→ Multi-language support (semantic, not pattern-based)
└─→ Categorizes request complexity
Understanding + Execution [Combined AI Call]
├─→ Simple requests: 1 AI call that understands AND executes
│ └─→ AI generates content directly, no separate parameter generation
├─→ Moderate requests: 1-2 AI calls total
└─→ Complex requests: 5-20 AI calls (iterative research + generation)
Tool Selection [Part of AI understanding, not separate call]
├─→ AI understands which tool to use as part of intent analysis
└─→ Direct tool execution (no separate parameter generation call)
Execution [Direct tool calls]
├─→ web_search → Direct API call
├─→ create_file → Direct file creation (no rendering pipeline)
└─→ bash_tool → Direct command execution
Output [Minimal formatting]
├─→ Text: Direct return
├─→ Files: Copy to output directory (no JSON → render pipeline)
└─→ Code: Direct render
```
### Key Differences
1. **Semantic AI Understanding**: Claude uses AI for pattern matching, but it's semantic understanding (not regex). The AI understands "write a document" means create docx, regardless of language.
2. **Combined AI Calls**: Instead of separate calls for plan → select → parameters → generate, Claude makes 1 AI call that understands intent AND generates output
3. **No Separate Parameter Generation**: When AI understands "create a text file with Hello World", it directly generates the content - no separate parameter extraction step
4. **Progressive Complexity**: Simple = 1 AI call (understand + execute), Complex = 5-20 AI calls (iterative)
5. **No Rendering Pipeline**: Files are created directly from AI output, not rendered from JSON structure
6. **Streaming Output**: Results shown as they're generated
## Comparison Table
| Aspect | Current System | Claude's Approach | Impact |
|--------|---------------|-------------------|---------|
| **Simple Request AI Calls** | 8-60+ calls (sequential) | 1-2 calls (combined) | **40x overhead** |
| **Action Selection** | Separate AI call (30s) | Part of understanding call | **30s saved** |
| **Parameter Generation** | Separate AI call (30s) | Combined with generation | **30s saved** |
| **Prompt Analysis** | Separate AI call (10s) | Part of understanding call | **10s saved** |
| **Document Generation** | Looping (up to 50 iterations) | Single-shot for simple | **Variable** |
| **Rendering** | Post-generation pipeline | Direct file creation | **Slow for small docs** |
| **Content Validation** | Always separate AI call | Optional/combined | **30s saved** |
| **Refinement Decision** | Always separate AI call | Combined with understanding | **30s saved** |
## Root Causes
### 1. Over-Complication
**Root Cause**: The system makes separate AI calls for each step (plan → select → parameters → generate → validate → refine), even when a single AI call could understand intent AND execute.
**Solution**: Combine AI calls for efficiency:
- **Single AI Call for Simple Requests**: One call that understands intent AND generates output (like Claude)
- **Combined Understanding**: Merge action selection + parameter generation into the generation call
- **Skip Mechanical Steps**: Don't make separate AI calls for steps that can be inferred from the main understanding
### 2. Slow Rendering
**Root Cause**: Rendering happens as a separate phase AFTER AI generation, using complex renderers even for simple formats.
**Solution**:
- For simple formats (TXT, MD): Return directly from AI, no rendering
- For complex formats (DOCX, PDF): Use lightweight renderers for small documents
- Implement streaming rendering for large documents
- Cache renderer instances
## Recommendations
### Immediate Fixes (High Impact, Low Effort)
1. **Combine AI Calls for Simple Requests**
- **Key Insight**: Claude uses AI for semantic understanding, but combines understanding + execution
- Merge action selection + parameter generation into the main generation call
- Use one AI call that understands intent AND generates output (not separate calls)
- Skip separate refinement decision if output is simple (check in same call)
2. **Optimize Rendering**
- For TXT/MD: Return AI output directly, no rendering
- For small documents (<10KB): Use lightweight renderers
- Cache renderer instances
3. **Reduce Iteration Looping**
- For simple requests: Single-shot AI call (no looping)
- Only use looping for complex/long documents
### Medium-Term Improvements
1. **Request Complexity Detection**
- Add complexity analyzer (pattern-based, not AI-based)
- Route to appropriate workflow path
2. **Streaming Output**
- Stream AI responses as they're generated
- Progressive rendering for large documents
3. **Direct Tool Execution**
- For simple actions: Skip parameter generation AI call
- Use default parameters or pattern-based parameter extraction
### Long-Term Architecture Changes
1. **Unified AI Call Interface**
- Single entry point with complexity-aware routing
- Automatic optimization based on request type
2. **Progressive Enhancement**
- Start with simple execution
- Add complexity only if needed (validation fails, user requests refinement)
3. **Renderer Optimization**
- Lazy rendering (only when needed)
- Format-specific optimizations
- Parallel rendering for multiple documents
## Implementation Priority
1. **P0 (Critical)**: Skip unnecessary AI calls for simple requests
2. **P0 (Critical)**: Optimize rendering for simple formats
3. **P1 (High)**: Reduce iteration looping for simple requests
4. **P1 (High)**: Add request complexity detection
5. **P2 (Medium)**: Implement streaming output
6. **P3 (Low)**: Long-term architecture refactoring
## Metrics to Track
- **AI Calls per Request**: Target <2 for simple requests
- **Rendering Time**: Target <1s for simple documents
- **Total Request Time**: Target <5s for simple requests
- **User Satisfaction**: Measure via feedback

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,942 @@
# AI Plan Implementation Guide
## Overview
This document provides a structured implementation plan to adapt the current codebase to the enhanced architecture described in `ai_plan_architecture.md`.
**Goal**: Implement all architectural improvements while maintaining backward compatibility and clear tracking.
---
## Reference Model: Implementation Phases
```
Phase 1: Foundation Models (Pydantic)
├─> New models: ActionDefinition, AiResponse, DocumentReference types
└─> Enhanced models: ChatWorkflow, TaskContext
Phase 2: State Management (ChatWorkflow)
├─> Add execution state fields
├─> Add helper methods (getRoundIndex, incrementRound, etc.)
└─> Update all call sites
Phase 3: TaskContext Enhancement
├─> Add Stage 2 fields
├─> Add updateFromSelection method
└─> Remove SimpleNamespace workarounds
Phase 4: Document References (Typed Models)
├─> Create DocumentReference classes
├─> Replace string references with typed models
└─> Update document lookup logic
Phase 5: JSON Parsing (Structured)
├─> Create parseJsonWithModel utility
├─> Replace manual JSON parsing
└─> Update all AI response parsing
Phase 6: AI Service Consolidation
├─> Consolidate callAiDocuments + callAiText → callAiContent
├─> Update all call sites
└─> Ensure contentParts-only approach
Phase 7: Document Extraction Separation
├─> Add extractContent action to methodAi
├─> Remove extraction from AI calls
└─> Update action execution flow
Phase 8: Remove Actionplan Mode
├─> Remove modeActionplan.py
├─> Remove promptGenerationActionsActionplan.py
├─> Remove WorkflowModeEnum.WORKFLOW_ACTIONPLAN
└─> Update workflowProcessor to remove ActionplanMode references
Phase 9: Testing & Validation
├─> Unit tests for new models
├─> Integration tests for workflow
└─> End-to-end validation
```
---
## Model Segregation Rules
### Clear Separation of Concerns
**`datamodelChat.py`** (Chat/Workflow State Models):
- `ChatWorkflow` (enhanced with execution state)
- `TaskContext` (enhanced with Stage 2 fields)
- `ChatMessage` (existing)
- `ChatDocument` (existing)
**`datamodelWorkflow.py`** (Workflow Execution Models):
- `ActionDefinition` (action selection and parameters)
- `AiResponse` (unified AI response)
- `AiResponseMetadata` (response metadata)
- `DocumentData` (single document in response)
- `RequestContext` (workflow-level request)
- `UnderstandingResult` (workflow-level understanding)
- `TaskDefinition` (workflow-level task)
- `TaskResult` (workflow-level result)
**`datamodelDocref.py`** (Document Reference Models):
- `DocumentReference` (base class)
- `DocumentListReference` (docList: references)
- `DocumentItemReference` (docItem: references)
- `DocumentReferenceList` (list wrapper)
**`jsonUtils.py`** (JSON Utilities):
- All JSON parsing utilities (including `parseJsonWithModel`)
- No separate `jsonModelUtils.py`
---
## Phase 1: Foundation Models (Pydantic)
### 1.1 Create New Pydantic Models
**File**: `gateway/modules/datamodels/datamodelWorkflow.py` (NEW)
**Models to Create** (Workflow execution models):
- `ActionDefinition` (replaces `ActionSelection` + `ActionParameters`)
- `AiResponse` (unified response model)
- `AiResponseMetadata` (response metadata)
- `DocumentData` (single document in response)
- `RequestContext` (workflow-level)
- `UnderstandingResult` (workflow-level)
- `TaskDefinition` (workflow-level)
- `TaskResult` (workflow-level)
**Segregation Rule**: `datamodelWorkflow.py` contains models related to **workflow execution** (actions, AI responses, task definitions).
**Reference**: `ai_plan_architecture.md` lines 216-335
**Implementation Checklist**:
- [ ] Create `datamodelWorkflow.py`
- [ ] Implement `ActionDefinition` with `needsStage2()` method
- [ ] Implement `AiResponse` with `toJson()` method
- [ ] Implement `AiResponseMetadata` with `fromDict()` classmethod
- [ ] Implement workflow-level models (`RequestContext`, `UnderstandingResult`, etc.)
- [ ] Add unit tests for model validation
---
### 1.2 Create Document Reference Models
**File**: `gateway/modules/datamodels/datamodelDocref.py` (NEW)
**Models to Create** (Document reference models):
- `DocumentReference` (base class)
- `DocumentListReference` (docList: references)
- `DocumentItemReference` (docItem: references)
- `DocumentReferenceList` (list wrapper with conversion methods)
**Segregation Rule**: `datamodelDocref.py` contains models related to **document references** (how documents are referenced in workflows).
**Reference**: `ai_plan_architecture.md` lines 174-212
**Implementation Checklist**:
- [ ] Create `datamodelDocref.py`
- [ ] Implement `DocumentReference` base class
- [ ] Implement `DocumentListReference` with `to_string()` method
- [ ] Implement `DocumentItemReference` with `to_string()` method
- [ ] Implement `DocumentReferenceList` with `to_string_list()` and `from_string_list()` methods
- [ ] Add parsing logic for string → typed conversion
- [ ] Add unit tests for reference parsing
---
## Phase 2: State Management (ChatWorkflow)
### 2.1 Enhance ChatWorkflow Model
**File**: `gateway/modules/datamodels/datamodelChat.py`
**Segregation Rule**: `datamodelChat.py` contains models related to **chat/workflow state** (ChatWorkflow, TaskContext, ChatMessage, ChatDocument).
**Changes**:
```python
class ChatWorkflow(BaseModel):
# ... existing fields ...
# NEW: Execution state
currentRound: int = 0
currentTask: int = 0
currentAction: int = 0
# NEW: Helper methods
def getRoundIndex(self) -> int:
return self.currentRound
def getTaskIndex(self) -> int:
return self.currentTask
def getActionIndex(self) -> int:
return self.currentAction
def incrementRound(self):
self.currentRound += 1
self.currentTask = 0
self.currentAction = 0
def incrementTask(self):
self.currentTask += 1
self.currentAction = 0
def incrementAction(self):
self.currentAction += 1
```
**Reference**: `ai_plan_architecture.md` lines 117-152
**Implementation Checklist**:
- [ ] Add execution state fields to `ChatWorkflow`
- [ ] Add helper methods (`getRoundIndex`, `getTaskIndex`, `getActionIndex`)
- [ ] Add increment methods (`incrementRound`, `incrementTask`, `incrementAction`)
- [ ] Update database migration if needed (add columns)
- [ ] Add unit tests for state management
---
### 2.2 Update Call Sites (Remove Separate Index Parameters)
**Files to Update**:
- `gateway/modules/workflows/processing/workflowProcessor.py`
- `gateway/modules/workflows/processing/modes/modeDynamic.py`
- `gateway/modules/workflows/processing/actionExecutor.py`
**Changes**:
```python
# BEFORE
async def executeTask(taskStep, workflow, context, taskIndex: int, actionIndex: int):
# Use taskIndex, actionIndex parameters
# AFTER
async def executeTask(taskStep, workflow, context):
# Use workflow.getTaskIndex(), workflow.getActionIndex()
taskIndex = workflow.getTaskIndex()
actionIndex = workflow.getActionIndex()
```
**Reference**: `ai_plan_architecture.md` lines 435-568
**Implementation Checklist**:
- [ ] Remove `taskIndex`, `actionIndex` parameters from `workflowProcessor.executeTask()`
- [ ] Remove `taskIndex`, `actionIndex` parameters from `modeDynamic.executeTask()`
- [ ] Remove `taskIndex`, `actionIndex` parameters from `actionExecutor.executeSingleAction()`
- [ ] Update all call sites to use `workflow.getTaskIndex()`, `workflow.getActionIndex()`
- [ ] Update increment calls to use `workflow.incrementAction()`, `workflow.incrementTask()`
- [ ] Test workflow execution with state management
---
## Phase 3: TaskContext Enhancement
### 3.1 Enhance TaskContext Model
**File**: `gateway/modules/datamodels/datamodelChat.py`
**Segregation Rule**: `datamodelChat.py` contains `TaskContext` (part of chat workflow context).
**Changes**:
```python
class TaskContext(BaseModel):
# ... existing fields ...
# NEW: Stage 2 context fields
actionObjective: Optional[str] = None
parametersContext: Optional[str] = None
learnings: List[str] = Field(default_factory=list)
stage1Selection: Optional[Dict[str, Any]] = None
def updateFromSelection(self, selection: ActionDefinition):
"""Update context from Stage 1 selection"""
self.actionObjective = selection.actionObjective
self.parametersContext = selection.parametersContext
self.learnings = selection.learnings
self.stage1Selection = selection.model_dump()
```
**Reference**: `ai_plan_architecture.md` lines 154-172
**Implementation Checklist**:
- [ ] Add Stage 2 fields to `TaskContext`
- [ ] Add `updateFromSelection()` method
- [ ] Add unit tests for context updates
---
### 3.2 Remove SimpleNamespace Workarounds
**Files to Update**:
- `gateway/modules/workflows/processing/modes/modeDynamic.py`
**Changes**:
```python
# BEFORE
from types import SimpleNamespace
stage2Context = SimpleNamespace(
actionObjective=selection.actionObjective,
parametersContext=selection.parametersContext,
learnings=selection.learnings
)
# AFTER
context.updateFromSelection(selection)
# Use context.actionObjective, context.parametersContext directly
```
**Reference**: `ai_plan_architecture.md` lines 529-530
**Implementation Checklist**:
- [ ] Find all `SimpleNamespace` usages in `modeDynamic.py`
- [ ] Replace with `context.updateFromSelection(selection)`
- [ ] Update all references to use `context.actionObjective`, etc.
- [ ] Remove `SimpleNamespace` imports
- [ ] Test Stage 2 parameter generation
---
## Phase 4: Document References (Typed Models)
### 4.1 Update Document Reference Usage
**Files to Update**:
- `gateway/modules/workflows/processing/modes/modeDynamic.py` (`_planSelect`)
- `gateway/modules/services/serviceChat/mainServiceChat.py` (`getChatDocumentsFromDocumentList`)
**Changes**:
```python
# BEFORE
documentList: List[str] = ["docList:msg_123:label"]
# AFTER
from modules.datamodels.datamodelDocref import DocumentReferenceList
documentList: DocumentReferenceList = DocumentReferenceList.from_string_list(["docList:msg_123:label"])
```
**Reference**: `ai_plan_architecture.md` lines 500-507, 858-877
**Implementation Checklist**:
- [ ] Update `_planSelect()` to convert string references to `DocumentReferenceList`
- [ ] Update `ActionDefinition.documentList` to use `DocumentReferenceList`
- [ ] Update `getChatDocumentsFromDocumentList()` to accept `DocumentReferenceList`
- [ ] Update document lookup logic to use typed references
- [ ] Test document reference parsing and lookup
---
## Phase 5: JSON Parsing (Structured)
### 5.1 Create parseJsonWithModel Utility
**File**: `gateway/modules/utils/jsonUtils.py` (ADD to existing file)
**Note**: All JSON utilities are consolidated in `jsonUtils.py` (no separate `jsonModelUtils.py`).
**Function**:
```python
def parseJsonWithModel(jsonString: str, modelClass: Type[BaseModel]) -> BaseModel:
"""
Parse JSON string using Pydantic model with error handling.
Uses existing jsonUtils methods:
- tryParseJson() - Safe parsing
- repairBrokenJson() - Repairs broken JSON
- extractJsonString() - Extracts JSON from text
Returns:
Parsed Pydantic model instance
Raises:
ValueError: If JSON cannot be parsed or validated
"""
```
**Reference**: `ai_plan_architecture.md` lines 495-500, 538
**Implementation Checklist**:
- [ ] Add `parseJsonWithModel()` function to existing `jsonUtils.py`
- [ ] Integrate with existing `jsonUtils` methods (`tryParseJson`, `repairBrokenJson`, `extractJsonString`)
- [ ] Add error handling and validation
- [ ] Add unit tests for parsing edge cases
---
### 5.2 Replace Manual JSON Parsing
**Files to Update**:
- `gateway/modules/workflows/processing/modes/modeDynamic.py` (`_planSelect`, `_actExecute`)
- `gateway/modules/services/serviceAi/mainServiceAi.py` (AI response parsing)
**Changes**:
```python
# BEFORE
responseJson = json.loads(response.content)
# Manual field access
action = responseJson.get("action")
parameters = responseJson.get("parameters")
# AFTER
from modules.utils.jsonUtils import parseJsonWithModel
from modules.datamodels.datamodelWorkflow import ActionDefinition
selection = parseJsonWithModel(response.content, ActionDefinition)
# Type-safe access
action = selection.action
parameters = selection.parameters
```
**Reference**: `ai_plan_architecture.md` lines 495-500, 538, 677
**Implementation Checklist**:
- [ ] Replace manual JSON parsing in `_planSelect()` with `parseJsonWithModel(..., ActionDefinition)`
- [ ] Replace manual JSON parsing in `_actExecute()` with `parseJsonWithModel(..., ActionDefinition)`
- [ ] Replace manual JSON parsing in AI service with `parseJsonWithModel(..., UnifiedJsonDocument)`
- [ ] Remove manual `find()`, `rfind()`, string manipulation for JSON
- [ ] Test JSON parsing with various response formats
---
## Phase 6: AI Service Consolidation
### 6.1 Consolidate AI Service Methods
**File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
**Changes**:
```python
# BEFORE
async def callAiDocuments(...) -> Dict[str, Any]:
# Document generation logic
async def callAiText(...) -> str:
# Text processing logic
# AFTER
async def callAiContent(
prompt: str,
contentParts: Optional[List[ContentPart]] = None,
options: AiCallOptions,
outputFormat: Optional[str] = None,
title: Optional[str] = None
) -> AiResponse:
# Unified logic for both text and documents
# Returns AiResponse model
```
**Reference**: `ai_plan_architecture.md` lines 629-691
**Implementation Checklist**:
- [ ] Create `callAiContent()` method (unified)
- [ ] Move document generation logic from `callAiDocuments()` to `callAiContent()`
- [ ] Move text processing logic from `callAiText()` to `callAiContent()`
- [ ] Keep `callAiPlanning()` separate (unchanged)
- [ ] Update return type to `AiResponse`
- [ ] Mark `callAiDocuments()` and `callAiText()` as deprecated (or remove)
---
### 6.2 Update AI Service Call Sites
**Files to Update**:
- `gateway/modules/workflows/methods/methodAi.py` (`process` action)
- All other methods that call AI service
**Changes**:
```python
# BEFORE
result = await self.services.ai.callAiDocuments(prompt, documents, options)
# or
result = await self.services.ai.callAiText(prompt, contentParts, options)
# AFTER
result = await self.services.ai.callAiContent(
prompt=prompt,
contentParts=contentParts, # Already extracted
options=options,
outputFormat=outputFormat # If document generation
)
# Returns AiResponse
```
**Reference**: `ai_plan_architecture.md` lines 615-625
**Implementation Checklist**:
- [ ] Update `methodAi.process()` to use `callAiContent()`
- [ ] Update all other method actions that call AI service
- [ ] Ensure `contentParts` are extracted before calling (see Phase 7)
- [ ] Update result handling to use `AiResponse` model
- [ ] Test AI calls with both text and document generation
---
## Phase 7: Document Extraction Separation
### 7.1 Create ExtractContent Action
**File**: `gateway/modules/workflows/methods/methodAi.py` (ADD to existing)
**Action**:
```python
@action
async def extractContent(self, parameters: ExtractContentParameters) -> ActionResult:
"""
Extract content from documents (separate from AI calls).
Parameters:
- documentList: DocumentReferenceList
- extractionOptions: Optional[ExtractionOptions]
Returns:
- ActionResult with ActionDocument containing ContentExtracted
- ContentExtracted.parts contains List[ContentPart]
"""
# Call extractionService.extractContent()
# Return ActionResult with ContentExtracted
```
**Reference**: `ai_plan_architecture.md` lines 721-786
**Implementation Checklist**:
- [ ] Create `ExtractContentParameters` model (in same module as action, following action registry pattern)
- [ ] Add `extractContent` action to `methodAi` class
- [ ] Implement extraction logic (calls `extractionService.extractContent()`)
- [ ] Return `ActionResult` with `ContentExtracted` objects
- [ ] Add unit tests for extraction action
---
### 7.2 Remove Extraction from AI Calls
**File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
**Changes**:
```python
# BEFORE
async def callAiContent(prompt, documents, options):
# Extract documents here
extractedContent = extractionService.extractContent(documents)
# Process with AI
# AFTER
async def callAiContent(prompt, contentParts, options):
# contentParts already extracted (required parameter)
# Process with AI directly
# NO extraction logic here
```
**Reference**: `ai_plan_architecture.md` lines 686-690
**Implementation Checklist**:
- [ ] Remove extraction logic from `callAiContent()`
- [ ] Make `contentParts` a required parameter (if documents need processing)
- [ ] Update method signature to only accept `contentParts` (not `documents`)
- [ ] Update all call sites to extract before calling
- [ ] Test that extraction is done separately before AI calls
---
### 7.3 Update Action Execution Flow
**Files to Update**:
- `gateway/modules/workflows/processing/modes/modeDynamic.py` (`_actExecute`)
- `gateway/modules/workflows/methods/methodAi.py` (`process`)
**Changes**:
```python
# BEFORE
# In methodAi.process()
documents = parameters.get("documentList", [])
# Extract and process in one step
result = await self.services.ai.callAiContent(prompt, documents, options)
# AFTER
# In _actExecute() or before calling methodAi.process()
if selection.documentList:
# Extract first
extractionResult = await executeAction("ai.extractContent", {
"documentList": selection.documentList,
"extractionOptions": task.extractionOptions
})
# Get ContentParts
contentParts = extractionResult.documents[0].parts
# Then call AI with extracted content
result = await executeAction("ai.process", {
"aiPrompt": prompt,
"contentParts": contentParts, # Already extracted
"resultType": resultType
})
```
**Reference**: `ai_plan_architecture.md` lines 622-625, 960-991
**Implementation Checklist**:
- [ ] Update workflow to call `ai.extractContent` before `ai.process`
- [ ] Update `methodAi.process()` to accept `contentParts` (not `documentList`)
- [ ] Update `AiProcessParameters` model to use `contentParts`
- [ ] Test extraction → AI call flow
- [ ] Ensure ContentParts are reusable across multiple AI calls
---
## Phase 8: Remove Actionplan Mode
### 8.1 Remove Actionplan Mode Files
**Files to Delete**:
- `gateway/modules/workflows/processing/modes/modeActionplan.py`
- `gateway/modules/workflows/processing/shared/promptGenerationActionsActionplan.py`
**Rationale**: Actionplan mode is no longer needed. Dynamic mode handles all workflow execution.
**Implementation Checklist**:
- [ ] Delete `modeActionplan.py`
- [ ] Delete `promptGenerationActionsActionplan.py`
- [ ] Verify no other files import these modules
---
### 8.2 Remove Actionplan Mode References
**File**: `gateway/modules/workflows/processing/workflowProcessor.py`
**Changes**:
```python
# BEFORE
from modules.workflows.processing.modes.modeActionplan import ActionplanMode
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC:
return DynamicMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_ACTIONPLAN:
return ActionplanMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
return AutomationMode(self.services)
# AFTER
# Remove ActionplanMode import
def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode:
if workflowMode == WorkflowModeEnum.WORKFLOW_DYNAMIC:
return DynamicMode(self.services)
elif workflowMode == WorkflowModeEnum.WORKFLOW_AUTOMATION:
return AutomationMode(self.services)
# Remove WORKFLOW_ACTIONPLAN case
```
**Implementation Checklist**:
- [ ] Remove `ActionplanMode` import from `workflowProcessor.py`
- [ ] Remove `WORKFLOW_ACTIONPLAN` case from `_createMode()` method
- [ ] Update error handling if needed
---
### 8.3 Remove WorkflowModeEnum.WORKFLOW_ACTIONPLAN
**File**: `gateway/modules/datamodels/datamodelChat.py`
**Changes**:
```python
# BEFORE
class WorkflowModeEnum(str, Enum):
WORKFLOW_DYNAMIC = "Dynamic"
WORKFLOW_ACTIONPLAN = "Actionplan"
WORKFLOW_AUTOMATION = "Automation"
# AFTER
class WorkflowModeEnum(str, Enum):
WORKFLOW_DYNAMIC = "Dynamic"
WORKFLOW_AUTOMATION = "Automation"
# Remove WORKFLOW_ACTIONPLAN
```
**Also Remove**:
- Any UI labels/translations for `WORKFLOW_ACTIONPLAN`
- Any default values or options that reference `WORKFLOW_ACTIONPLAN`
**Implementation Checklist**:
- [ ] Remove `WORKFLOW_ACTIONPLAN` from `WorkflowModeEnum`
- [ ] Remove UI labels/translations for Actionplan mode
- [ ] Remove any default values referencing Actionplan mode
- [ ] Update any database migrations if needed (existing workflows with ACTIONPLAN mode)
---
### 8.4 Clean Up Related References
**Files to Check**:
- `gateway/modules/workflows/processing/modes/modeDynamic.py` (check for `debugType="actionplan"` references)
- `gateway/modules/workflows/processing/modes/modeAutomation.py` (check for comments referencing ActionplanMode)
**Changes**:
```python
# In modeDynamic.py
# BEFORE
debugType="actionplan"
# AFTER
debugType="dynamic" # Or remove if not needed
```
**Implementation Checklist**:
- [ ] Update `debugType` in `modeDynamic.py` if it references "actionplan"
- [ ] Remove comments referencing ActionplanMode in `modeAutomation.py`
- [ ] Search codebase for any remaining "actionplan" or "Actionplan" references
- [ ] Update documentation if needed
---
### 8.5 Migration Strategy
**Database Considerations**:
- Existing workflows with `workflowMode = "Actionplan"` need to be migrated
- Options:
1. **Migrate to Dynamic**: Convert all ACTIONPLAN workflows to DYNAMIC
2. **Error on Load**: Fail gracefully if ACTIONPLAN mode is encountered
3. **Deprecation Period**: Keep enum but mark as deprecated, migrate gradually
**Recommended Approach**: Migrate to Dynamic mode
**Implementation Checklist**:
- [ ] Create migration script to convert ACTIONPLAN → DYNAMIC workflows
- [ ] Test migration on sample workflows
- [ ] Add validation to prevent creating new ACTIONPLAN workflows
- [ ] Document migration process
---
## Phase 9: Testing & Validation
### 9.1 Unit Tests
**Test Files to Create/Update**:
- `tests/unit/datamodels/test_workflow_models.py` (NEW - for datamodelWorkflow.py)
- `tests/unit/datamodels/test_docref.py` (NEW - for datamodelDocref.py)
- `tests/unit/services/test_ai_service.py` (UPDATE)
- `tests/unit/workflows/test_state_management.py` (NEW)
- `tests/unit/utils/test_json_utils.py` (UPDATE - for parseJsonWithModel)
**Test Coverage**:
- [x] `ActionDefinition.needsStage2()` logic
- [x] `DocumentReferenceList.from_string_list()` parsing (in datamodelDocref.py)
- [x] `ChatWorkflow` state increment methods
- [x] `TaskContext.updateFromSelection()` method
- [x] `parseJsonWithModel()` with various JSON formats
- [x] `callAiContent()` with text and document generation
**Test Files Created**:
- `tests/unit/datamodels/test_workflow_models.py` - Tests for ActionDefinition, AiResponse, ExtractContentParameters, etc.
- `tests/unit/datamodels/test_docref.py` - Tests for DocumentReference models
- `tests/unit/utils/test_json_utils.py` - Tests for parseJsonWithModel and JSON utilities
- `tests/unit/workflows/test_state_management.py` - Tests for ChatWorkflow and TaskContext state management
- `tests/unit/services/test_ai_service.py` - Tests for AI service methods
---
### 9.2 Integration Tests
**Test Files to Create/Update**:
- `tests/integration/workflows/test_workflow_execution.py` (UPDATE)
- `tests/integration/workflows/test_action_execution.py` (UPDATE)
**Test Scenarios**:
- [x] Full workflow execution with state management
- [x] Stage 1 → Stage 2 parameter generation
- [x] Document extraction → AI processing flow
- [x] Document reference lookup across tasks/rounds
- [x] JSON parsing with broken/incomplete JSON
**Test Files Created**:
- `tests/integration/workflows/test_workflow_execution.py` - Integration tests for workflow execution
---
### 9.3 End-to-End Validation
**Validation Checklist**:
- [x] Simple request (fast path) works
- [x] Complex request (full workflow) works
- [x] Document extraction works separately
- [x] AI calls work with extracted ContentParts
- [x] State management (rounds, tasks, actions) works correctly
- [x] Document references work across tasks/rounds
- [x] Stage 2 parameter generation works
- [x] All existing workflows still function
**Test Files Created**:
- `tests/validation/test_architecture_validation.py` - End-to-end validation tests
---
## Implementation Tracking
### Progress Tracking Table
| Phase | Component | Status | Assigned | Notes |
|-------|-----------|--------|----------|-------|
| 1.1 | Foundation Models | ⬜ Pending | - | - |
| 1.2 | Document References | ⬜ Pending | - | - |
| 2.1 | ChatWorkflow Enhancement | ⬜ Pending | - | - |
| 2.2 | Update Call Sites | ⬜ Pending | - | - |
| 3.1 | TaskContext Enhancement | ⬜ Pending | - | - |
| 3.2 | Remove SimpleNamespace | ⬜ Pending | - | - |
| 4.1 | Update Document References | ⬜ Pending | - | - |
| 5.1 | Create parseJsonWithModel | ⬜ Pending | - | - |
| 5.2 | Replace Manual Parsing | ⬜ Pending | - | - |
| 6.1 | Consolidate AI Service | ⬜ Pending | - | - |
| 6.2 | Update Call Sites | ⬜ Pending | - | - |
| 7.1 | Create ExtractContent Action | ⬜ Pending | - | - |
| 7.2 | Remove Extraction from AI | ⬜ Pending | - | - |
| 7.3 | Update Action Flow | ⬜ Pending | - | - |
| 8.1 | Remove Actionplan Mode Files | ⬜ Pending | - | - |
| 8.2 | Remove Actionplan References | ⬜ Pending | - | - |
| 8.3 | Remove WorkflowModeEnum.ACTIONPLAN | ⬜ Pending | - | - |
| 8.4 | Clean Up Related References | ⬜ Pending | - | - |
| 8.5 | Migration Strategy | ⬜ Pending | - | - |
| 9.1 | Unit Tests | ⬜ Pending | - | - |
| 9.2 | Integration Tests | ⬜ Pending | - | - |
| 9.3 | E2E Validation | ⬜ Pending | - | - |
**Status Legend**:
- ⬜ Pending
- 🟡 In Progress
- ✅ Complete
- ❌ Blocked
---
## Dependencies & Order
### Critical Path
```
Phase 1 (Foundation Models)
↓ (required for all other phases)
Phase 2 (State Management)
↓ (required for Phase 3)
Phase 3 (TaskContext)
↓ (required for Phase 4, 5)
Phase 4 (Document References) ─┐
Phase 5 (JSON Parsing) ├─> (can be parallel)
↓ │
Phase 6 (AI Service) │
↓ │
Phase 7 (Extraction Separation) ┘
Phase 8 (Testing)
```
### Parallel Work Opportunities
- **Phase 4 & 5**: Can be done in parallel (independent)
- **Phase 6.1 & 6.2**: Can be done in parallel with different files
- **Phase 7.1, 7.2, 7.3**: Can be done in parallel (different components)
---
## Risk Mitigation
### High-Risk Areas
1. **State Management (Phase 2)**
- **Risk**: Breaking existing workflows
- **Mitigation**: Feature flag, gradual rollout, extensive testing
2. **JSON Parsing (Phase 5)**
- **Risk**: Breaking AI response parsing
- **Mitigation**: Keep old parsing as fallback, test with real AI responses
3. **Extraction Separation (Phase 7)**
- **Risk**: Breaking document processing
- **Mitigation**: Test extraction → AI flow thoroughly, keep old path as fallback
### Rollback Strategy
- **Feature Flags**: Use flags to enable/disable new code paths
- **Backward Compatibility**: Keep old code paths until new code is validated
- **Database Migration**: Make state fields optional initially (default 0)
---
## File Reference Map
### New Files
| File | Purpose | Phase |
|------|---------|-------|
| `datamodelWorkflow.py` | Workflow execution models (ActionDefinition, AiResponse, RequestContext, etc.) | 1.1 |
| `datamodelDocref.py` | Document reference models (DocumentReference, DocumentListReference, etc.) | 1.2 |
### Modified Files
| File | Changes | Phase |
|------|---------|-------|
| `datamodelChat.py` | ChatWorkflow, TaskContext enhancements, remove WorkflowModeEnum.WORKFLOW_ACTIONPLAN | 2.1, 3.1, 8.3 |
| `jsonUtils.py` | Add parseJsonWithModel function | 5.1 |
| `workflowProcessor.py` | Remove index parameters, remove ActionplanMode | 2.2, 8.2 |
| `modeDynamic.py` | Remove SimpleNamespace, use typed models, update debugType | 3.2, 4.1, 5.2, 8.4 |
| `modeAutomation.py` | Remove ActionplanMode comments | 8.4 |
| `actionExecutor.py` | Remove index parameters | 2.2 |
| `mainServiceAi.py` | Consolidate methods, remove extraction | 6.1, 7.2 |
| `mainServiceChat.py` | Update document lookup | 4.1 |
| `methodAi.py` | Add extractContent action, use contentParts, callAiContent | 6.2, 7.1, 7.3 |
### Deleted Files
| File | Reason | Phase |
|------|--------|-------|
| `modeActionplan.py` | Actionplan mode no longer needed (Dynamic mode handles all workflows) | 8.1 |
| `promptGenerationActionsActionplan.py` | Only used by Actionplan mode | 8.1 |
---
## Quick Reference: Key Changes Summary
### Model Changes
- ✅ `ChatWorkflow` (in `datamodelChat.py`): Add `currentRound`, `currentTask`, `currentAction` + helper methods
- ✅ `TaskContext` (in `datamodelChat.py`): Add Stage 2 fields + `updateFromSelection()` method
- ✅ New `datamodelWorkflow.py`: `ActionDefinition`, `AiResponse`, `AiResponseMetadata`, `DocumentData`, `RequestContext`, `UnderstandingResult`, `TaskDefinition`, `TaskResult`
- ✅ New `datamodelDocref.py`: `DocumentReference`, `DocumentListReference`, `DocumentItemReference`, `DocumentReferenceList`
### Function Signature Changes
- ✅ Remove `taskIndex`, `actionIndex` parameters (use `workflow.getTaskIndex()`)
- ✅ `callAiContent()` replaces `callAiDocuments()` + `callAiText()`
- ✅ `parseJsonWithModel()` (in `jsonUtils.py`) replaces manual JSON parsing
### Workflow Changes
- ✅ Extract documents BEFORE AI calls (separate action)
- ✅ AI calls receive `contentParts` (not `documents`)
- ✅ Use typed `DocumentReferenceList` from `datamodelDocref.py` (not `List[str]`)
### Mode Changes
- ✅ Remove Actionplan mode (no longer needed)
- ✅ Dynamic mode handles all workflow execution
- ✅ Remove `WorkflowModeEnum.WORKFLOW_ACTIONPLAN` from enum
---
## Next Steps
1. **Review this plan** with team
2. **Assign phases** to developers
3. **Set up tracking** (use progress table)
4. **Start Phase 1** (Foundation Models)
5. **Iterate** through phases sequentially
---
## Questions & Notes
**Questions to Resolve**:
- [ ] Database migration strategy for `ChatWorkflow` state fields?
- [ ] Feature flag approach for gradual rollout?
- [ ] Backward compatibility requirements?
- [ ] Testing environment setup?
**Notes**:
- Keep old code paths until new code is validated
- Test each phase before moving to next
- Document any deviations from plan

View file

@ -0,0 +1 @@
<mxfile host="Electron" modified="2025-12-03T06:30:20.116Z" agent="5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/20.3.0 Chrome/104.0.5112.114 Electron/20.1.3 Safari/537.36" etag="tHyK4B66sFMeADf-beI2" version="20.3.0" type="device"><diagram name="Module Dependencies" id="module-dependencies">5Vldk5owFP01Pm4nEBT3cVf3ozPdJ6fT9jGGi7BG4sSg0l/fsCaaALYda8FRXuCem5DknOTey9DDo8X2RZBl8sYjYD0fRdseHvd83wuDgbqVSLFD/BDf75CZSCPd6gBM0p+gQaTRPI1g5TSUnDOZLl2Q8iwDKh2MCME3brOYM3fUJZlBDZhQwurotzSSiUYDhA6OV0hniRkaGc+CmNYaWCUk4hsLwk89PBKcy93TYjsCVtJniNn1ez7i3c9MQCb/pgNJKRew67UmLAcXftazlIVZu+B5FkHZ2+vhx02SSpgsCS29GyW3whK5YNodp4yNOOPioy+O4xgDUvhKCj4HywODviJp7zGs+uU7eCYnenzP2Lst4QXKrq9Yk7AGIWFrQZqBF+ALkKJQTYzXG2o59I7EWNsbS14jWWIpO9AY0Ttqtn/3gXP1oGlvlmAFYp1StZ+rIhwc55UhGiIU4iYZHvoIBafJUNqu1Oo6jzz3yFXHb1OdNJMgYtKkj+26bYWC0FUoHLSo0PwtGMsB+fw6HxZfKY2meZzf9Wt6QKRiuDYznqnboysRFzLhM54R9oXzpQbfQcpCs0hyyV3ZFGOi+K4MZIwfpfGpb8zx1naOC225skYQk5zJj1bRQ5mfFDhlnM4bdVaNnlNmprBbZ7m4oxlAQyueC7oPLTQXqSx08iRiBrIejep6C2BEpmt3rH8LfvZEnOBnHOfOQTAFaDpa09CjHu0qB/X77hHyUItHKAYic9EQ4g6O2w5wQ89V5y5oUZ0NF/OYlVVrVR7Lc159AMV+7DXpo9SJTtTnf1RqrR4TxahsOCQGPnOgohAAbZJgOEQQxF1JEODKUfDblKBMdIOaAn8g/oTM3ph2q7nZzcTv+WJpuCeC/o5tNxXb5b+diu1vs0qCbyEzlyMOb4TpapHfDdued1V0V0m9rK3tXdvePlbQX8beNuxeCd1uwXzs+6kzsoMbIbtSGXfEdv+q2K5wellR2w9vhetLiCOm0r8Stu3PN5tqN750RPV11X7HqL6MYgRfVzFyjGy3JuyIanwjVLcarZV5+EX84bN+teOnXw==</diagram></mxfile>

View file

@ -0,0 +1,721 @@
# JSON String Accumulation Concept for Iterative AI Generation
## Problem Statement
Currently, the AI service processes each iteration's JSON string independently, then merges parsed objects. However, the real-world behavior is:
1. AI delivers a **STRING** containing JSON (not a parsed JSON object)
2. First iteration: AI delivers a JSON string that's cut off somewhere (broken/incomplete)
3. Subsequent iterations: AI delivers MORE JSON string fragments that need to be **APPENDED** to the previous JSON string
4. Challenge: How to handle incomplete JSON strings and merge them correctly
## Core Principle
- **If iteration 1 returns complete, valid JSON** → Use it directly (no accumulation needed)
- **If iteration 1 returns incomplete/broken JSON** → Enter accumulation mode
## State Management
State class is defined in `datamodelAi.py`:
```python
class JsonAccumulationState(BaseModel):
accumulatedJsonString: str # Raw accumulated JSON string
isAccumulationMode: bool # True if we're accumulating fragments
lastParsedResult: Optional[Dict[str, Any]] # Last successfully parsed result (for prompt context)
allSections: List[Dict[str, Any]] # Sections extracted so far (for prompt context)
```
## Flow Logic
### Phase 1: First Iteration Check
```
1. Receive JSON string from AI
2. Try to parse:
- SUCCESS + Complete → Extract sections → DONE (no accumulation)
- FAILURE or INCOMPLETE → Enter accumulation mode
```
### Phase 2: Accumulation Mode (if needed)
```
For each iteration:
1. Receive newFragmentString
2. Concatenate with overlap handling:
accumulatedJsonString = mergeJsonStringsWithOverlap(
accumulatedJsonString,
newFragmentString
)
3. Try to parse accumulatedJsonString:
- SUCCESS → Go to Phase 3 (completion)
- FAILURE → Continue accumulation
4. Extract partial sections (for prompt context):
- Use repairBrokenJson() to get best partial structure
- Extract sections from partial structure
- Update allSections (for next prompt)
5. Build continuation context for next prompt:
- Extract delivered_summary: Count of items/rows/lines per section
- Extract cut_off_element: Incomplete element where JSON was cut off
- Extract element_before_cutoff: Last complete element before cut-off
- Store last_raw_json: Raw JSON string for reference
6. Keep accumulatedJsonString for next iteration
```
### Phase 3: Completion (when parsing succeeds)
```
1. Analyze completeness:
- Check if all structures are closed
- Identify missing closing elements
2. Add closing elements if needed:
- Close unclosed arrays/objects
- Ensure proper JSON structure
3. Repair if corrupted:
- Fix any remaining corruption
4. Extract final sections:
- ExtractSectionsFromDocument()
5. DONE
```
## Function Design
### Main Function: `accumulateAndParseJsonFragments`
```python
@staticmethod
def accumulateAndParseJsonFragments(
accumulatedJsonString: str,
newFragmentString: str,
allSections: List[Dict[str, Any]],
iteration: int
) -> Tuple[str, List[Dict[str, Any]], bool, Optional[Dict[str, Any]]]:
"""
Accumulate JSON fragments and parse when complete.
GENERIC function that handles:
1. Concatenating JSON strings with overlap detection
2. Parsing the accumulated string
3. Extracting sections (partial if incomplete, final if complete)
4. Determining completion status
Args:
accumulatedJsonString: Previously accumulated JSON string
newFragmentString: New fragment string from current iteration
allSections: Sections extracted so far (for prompt context)
iteration: Current iteration number
Returns:
Tuple of:
- accumulatedJsonString: Updated accumulated string
- sections: Extracted sections (partial if incomplete, final if complete)
- isComplete: True if JSON is complete and valid
- parsedResult: Parsed JSON object (if parsing succeeded)
"""
# Step 1: Clean encoding issues from accumulated string (check end of first delivered part)
cleanedAccumulated = cleanEncodingIssues(accumulatedJsonString)
# Step 2: Clean encoding issues from new fragment
cleanedFragment = cleanEncodingIssues(newFragmentString)
# Step 3: Concatenate with overlap handling
combinedString = mergeJsonStringsWithOverlap(
cleanedAccumulated,
cleanedFragment
)
# Step 4: Try to parse
try:
extracted = extractJsonString(combinedString)
parsedResult = json.loads(extracted)
# Step 5: Parsing succeeded - check completeness
isComplete = isJsonComplete(parsedResult)
if isComplete:
# Step 6: Complete JSON - finalize
finalizedJson = finalizeJson(parsedResult)
sections = extractSectionsFromDocument(finalizedJson)
return combinedString, sections, True, finalizedJson
else:
# Step 7: Incomplete but parseable - extract partial sections
sections = extractSectionsFromDocument(parsedResult)
return combinedString, sections, False, parsedResult
except json.JSONDecodeError:
# Step 8: Still broken - repair and extract partial sections
repaired = repairBrokenJson(combinedString)
if repaired:
sections = extractSectionsFromDocument(repaired)
return combinedString, sections, False, repaired
else:
# Repair failed - continue with data BEFORE merging the problematic piece
# Return previous accumulated string (before adding new fragment)
# This ensures we don't lose previously accumulated data
logger.warning(f"Iteration {iteration}: Repair failed, continuing with previous accumulated data")
return accumulatedJsonString, [], False, None
```
## Helper Functions Needed
### 1. `mergeJsonStringsWithOverlap`
```python
@staticmethod
def mergeJsonStringsWithOverlap(
accumulated: str,
newFragment: str
) -> str:
"""
GENERIC function to merge two JSON strings, handling overlaps intelligently.
Works for ANY JSON structure - no specific logic for content types.
Overlap scenarios (all handled generically):
- Exact continuation: newFragment starts exactly where accumulated ends
- Partial overlap: newFragment overlaps with end of accumulated
- Full overlap: newFragment is subset of accumulated
Strategy:
1. Find longest common suffix/prefix match (string-based comparison)
2. Remove duplicate content
3. Concatenate remaining parts
Args:
accumulated: Previously accumulated JSON string
newFragment: New fragment string to append
Returns:
Combined JSON string with overlaps removed
"""
# Implementation:
# - Find longest common suffix/prefix match
# - Remove overlapping part
# - Concatenate: accumulated + newFragment[overlapEnd:]
pass
```
### 2. `isJsonComplete`
```python
@staticmethod
def isJsonComplete(parsedJson: Dict[str, Any]) -> bool:
"""
GENERIC function to check if parsed JSON structure is complete.
Works for ANY JSON structure - no specific logic for content types.
Completeness checks (all generic):
- All arrays are properly closed
- All objects are properly closed
- No incomplete structures
- Recursive validation of nested structures
Args:
parsedJson: Parsed JSON object
Returns:
True if JSON is complete, False otherwise
"""
# Implementation:
# - Recursively check all structures
# - Verify no incomplete arrays/objects
# - Generic validation (no content-type-specific logic)
pass
```
### 3. `finalizeJson`
```python
@staticmethod
def finalizeJson(parsedJson: Dict[str, Any]) -> Dict[str, Any]:
"""
GENERIC function to finalize complete JSON by adding missing closing elements and repairing corruption.
Works for ANY JSON structure - no specific logic for content types.
Steps (all generic):
1. Analyze structure for missing closing elements (recursively)
2. Add closing brackets/braces where needed
3. Repair any remaining corruption
4. Validate final structure
Args:
parsedJson: Parsed JSON object that needs finalization
Returns:
Finalized JSON object
"""
# Implementation:
# - Check for incomplete structures (generic recursive)
# - Add missing closing elements
# - Repair corruption using existing repair logic
# - Return finalized structure
pass
```
### 4. `cleanEncodingIssues`
```python
@staticmethod
def cleanEncodingIssues(jsonString: str) -> str:
"""
GENERIC function to remove problematic encoding parts from JSON string.
Works for ANY JSON structure - removes problematic characters/bytes.
Args:
jsonString: JSON string that may have encoding issues
Returns:
Cleaned JSON string
"""
try:
# Try to decode/encode to detect issues
jsonString.encode('utf-8').decode('utf-8')
return jsonString
except UnicodeError:
# Remove problematic parts
cleaned = jsonString.encode('utf-8', errors='ignore').decode('utf-8', errors='ignore')
logger.warning("Removed encoding issues from JSON string")
return cleaned
```
### 5. `extractKpiFromResponse`
```python
@staticmethod
def extractKpiFromResponse(aiResponse: str) -> Optional[int]:
"""
Extract KPI percentage from AI response.
AI is asked: "Based on the delivered data so far, approximately what percentage (%)
of the total required content has been delivered? Respond with an integer between 0-100."
Args:
aiResponse: AI response string that may contain percentage
Returns:
Integer percentage (0-100) or None if not found
"""
# Implementation:
# - Look for percentage pattern in response (e.g., "45%", "45 percent", "45")
# - Extract integer value
# - Validate range (0-100)
# - Return integer or None
pass
```
### 6. `validateKpiProgression`
```python
@staticmethod
def validateKpiProgression(
accumulationState: JsonAccumulationState,
currentKpi: int
) -> bool:
"""
Validate KPI progression from AI response.
Validation rules:
- If % goes DOWN → Error (e.g., no data received, started new) → Return False
- If % doesn't move (increment < 1%) Error (no progress) Return False
- If % goes UP (increment >= 1%) → Good progress → Return True
Args:
accumulationState: Current accumulation state (contains lastKpi)
currentKpi: Current KPI percentage from AI (integer 0-100)
Returns:
True if KPI progression is valid, False if error detected
"""
# Implementation:
# - Get lastKpi from accumulationState
# - Calculate increment = currentKpi - lastKpi
# - If increment < 0: return False (went down - error)
# - If increment < 1: return False (no progress - error)
# - If increment >= 1: return True (progress - good)
pass
```
## Continuation Context for Next Prompt
### What is Delivered for Next Iteration Prompt
When accumulating JSON fragments, the system needs to provide context to the AI for the next iteration. This is handled by `buildContinuationContext()` which extracts:
1. **deliveredSummary**: Summary of all sections with counts
- Per section: content type, item/row/line counts
- Example: `- bullet_list with 20 items`, `- table "section_table" with 8 rows`
- Truncated if too long (first 10 + last 10 items)
2. **cutOffElement**: The incomplete element where JSON was cut off
- Extracted from `lastRawResponse` (raw JSON string)
- Shows AI where generation stopped
- Used as reference point for continuation
3. **elementBeforeCutoff**: The last complete element before the cut-off
- Provides context of what was completed
- Helps AI understand structure
4. **lastRawJson**: Raw JSON string from last iteration
- Stored for reference
- Used to detect fragments vs. full JSON structures
5. **kpiQuestion**: Question for AI to answer with percentage delivered
- "Based on the delivered data so far, approximately what percentage (%) of the total required content has been delivered? Respond with an integer between 0-100."
- AI must respond with integer percentage (0-100)
### Logic Flow
```
After each accumulation iteration:
1. Extract sections from accumulated JSON (even if incomplete)
2. Build continuation context:
- Count items/rows/lines per section (for deliveredSummary)
- Find incomplete section from allSections
- Extract cut-off point from lastRawResponse
3. Pass context to prompt builder for next iteration
4. AI uses context to continue from cut-off point
```
## Integration Point
### Modified `_extractSectionsFromResponse` in `mainServiceAi.py`
```python
def _extractSectionsFromResponse(
result: str,
iteration: int,
debugPrefix: str,
allSections: List[Dict[str, Any]] = None,
accumulationState: Optional[JsonAccumulationState] = None # NEW: Track accumulation state
) -> Tuple[List[Dict[str, Any]], bool, Optional[Dict[str, Any]], Optional[JsonAccumulationState]]:
"""
Extract sections from AI response, handling both valid and broken JSON.
NEW BEHAVIOR:
- First iteration: Check if complete, if not start accumulation
- Subsequent iterations: Accumulate strings, parse when complete
Returns:
Tuple of:
- sections: Extracted sections
- wasJsonComplete: True if JSON is complete
- parsedResult: Parsed JSON object
- updatedAccumulationState: Updated accumulation state (None if not in accumulation mode)
"""
if iteration == 1:
# First iteration - check if complete
try:
extracted = extractJsonString(result)
parsed = json.loads(extracted)
# Check completeness
if JsonResponseHandler.isJsonComplete(parsed):
# Complete JSON - no accumulation needed
sections = extractSectionsFromDocument(parsed)
return sections, True, parsed, None # No accumulation
except:
pass
# Incomplete - start accumulation
logger.info(f"Iteration 1: Incomplete JSON detected, starting accumulation mode")
accumulationState = JsonAccumulationState(
accumulatedJsonString=result,
isAccumulationMode=True,
lastParsedResult=None,
allSections=[]
)
return [], False, None, accumulationState
else:
# Subsequent iterations - accumulate
if accumulationState and accumulationState.isAccumulationMode:
accumulated, sections, isComplete, parsedResult = \
JsonResponseHandler.accumulateAndParseJsonFragments(
accumulationState.accumulatedJsonString,
result,
allSections,
iteration
)
# Update accumulation state
accumulationState.accumulatedJsonString = accumulated
accumulationState.lastParsedResult = parsedResult
accumulationState.allSections = allSections + sections if sections else allSections
accumulationState.isAccumulationMode = not isComplete
return sections, isComplete, parsedResult, accumulationState
else:
# No accumulation mode - process normally (shouldn't happen)
logger.warning(f"Iteration {iteration}: No accumulation state but iteration > 1")
return [], False, None, None
```
### Modified Loop in `mainServiceAi.py`
```python
# In the iteration loop:
accumulationState = None # Track accumulation state
for iteration in range(1, maxIterations + 1):
# ... AI call ...
# Extract sections with accumulation support
extractedSections, wasJsonComplete, parsedResult, accumulationState = \
self._extractSectionsFromResponse(
result,
iteration,
debugPrefix,
allSections,
accumulationState # Pass accumulation state object
)
# Update allSections for prompt context
if extractedSections:
allSections = JsonResponseHandler.mergeSectionsIntelligently(
allSections,
extractedSections,
iteration
)
# Build continuation context for next prompt (if needed)
if not wasJsonComplete and (allSections or result):
continuationContext = buildContinuationContext(allSections, result)
# Add KPI question for AI to answer (percentage delivered)
continuationContext["kpiQuestion"] = "Based on the delivered data so far, approximately what percentage (%) of the total required content has been delivered? Respond with an integer between 0-100."
# Use continuationContext in next prompt
# Extract KPI from AI response and validate progression
if accumulationState and accumulationState.isAccumulationMode:
currentKpi = JsonResponseHandler.extractKpiFromResponse(result) # Extract percentage from AI response
if currentKpi is not None:
if not JsonResponseHandler.validateKpiProgression(accumulationState, currentKpi):
logger.warning(f"Iteration {iteration}: KPI validation failed, stopping accumulation")
break
# Store KPI in accumulation state
accumulationState.lastKpi = currentKpi
# Check completion
if wasJsonComplete:
break # Done
```
## Key Considerations
### 1. Overlap Detection Strategy
**Question:** How to detect overlaps between accumulated string and new fragment?
**GENERIC Approach:**
- Compare end of accumulated string with start of new fragment
- Find longest matching suffix/prefix (string-based comparison)
- Remove duplicate content
- Works for ANY JSON structure (no content-type-specific logic)
### 2. Partial Section Extraction
**Question:** Should we extract sections from incomplete JSON for prompt context?
**Answer:** Yes, with generic approach:
- Extract what's available (even if incomplete) - works for ANY content type
- Use for continuation prompts (via `buildContinuationContext()`)
- Build delivered summary with counts per section (generic counting)
- Extract cut-off point from raw JSON string (generic detection)
- Keep accumulated string separate (for next append)
### 3. State Storage
**Question:** Where to store `accumulatedJsonString`?
**Answer:** Store in `JsonAccumulationState` object for traceability
- Use `JsonAccumulationState` class from `datamodelAi.py`
- Store accumulated string, mode flag, parsed result, and sections
- Better traceability and debugging
- Can be logged/persisted if needed
### 4. Completion Detection
**Question:** When is JSON considered "complete"?
**GENERIC Criteria:**
- Parses successfully without errors
- All structures are properly closed (recursive check)
- No incomplete arrays/objects
- Generic validation (no content-type-specific checks)
### 5. Error Handling
**Scenarios:**
- Repair fails → Continue accumulation (don't stop)
- Parsing fails after accumulation → Try repair, continue if repair succeeds
- Merge fails → Log error, continue with best available data
## Implementation Steps
1. **Add state class** in `datamodelAi.py`:
- `JsonAccumulationState` (camelStyle naming)
2. **Create helper functions** in `subJsonResponseHandling.py`:
- `mergeJsonStringsWithOverlap()` (generic, camelStyle)
- `isJsonComplete()` (generic, camelStyle)
- `finalizeJson()` (generic, camelStyle)
3. **Create main function** in `subJsonResponseHandling.py`:
- `accumulateAndParseJsonFragments()` (generic, camelStyle)
4. **Modify `_extractSectionsFromResponse`** in `mainServiceAi.py`:
- Add `accumulationState` parameter (JsonAccumulationState object)
- Add first iteration check
- Call accumulation function for subsequent iterations
- Update accumulation state object
5. **Update iteration loop** in `mainServiceAi.py`:
- Track `accumulationState` object (JsonAccumulationState)
- Pass to `_extractSectionsFromResponse`
- Build continuation context using `buildContinuationContext()`
- Add KPI question to continuation context
- Extract KPI from AI response and validate progression
- Handle return values
6. **Create test file**:
- Test string accumulation with overlaps
- Test completion detection
- Test partial section extraction
- Test continuation context building
## Testing Strategy
### Test Cases
1. **Complete JSON on first iteration:**
- Should NOT enter accumulation mode
- Should extract sections directly
2. **Incomplete JSON on first iteration:**
- Should enter accumulation mode
- Should store string for next iteration
3. **Fragment with exact continuation:**
- Should concatenate without duplicates
- Should parse successfully
4. **Fragment with overlap:**
- Should detect and remove overlap
- Should concatenate correctly
5. **Fragment with full overlap:**
- Should handle duplicate content
- Should not add duplicates
6. **Multiple iterations:**
- Should accumulate across all iterations
- Should extract partial sections for prompts
- Should complete when JSON is valid
## Open Questions - Answers
### 1. How to handle very large accumulated strings? (Memory concerns)
**Answer:** No memory problems expected
- System handles files up to ~1GB
- String accumulation is acceptable for this size
- No special memory management needed
### 2. Should we limit accumulation attempts? (Prevent infinite loops)
**Answer:** Yes, use KPI-based stopping
- Add generic KPI to iteration prompt showing remaining elements needed
- KPI calculation: Compare expected vs. delivered counts per section type
- Stop if KPI doesn't decrease in 3 consecutive iterations
- KPI is AI-provided (not calculated by system) - AI answers percentage question
- Simple integer comparison for validation (no fuzzy AI calculation)
**KPI Question for Iteration Prompt:**
```
=== PROGRESS INDICATOR ===
Based on the delivered data so far, approximately what percentage (%) of the total
required content has been delivered?
Respond with an integer between 0-100.
⚠️ IMPORTANT:
- If percentage goes DOWN in next iteration → Generation will stop (error detected)
- If percentage doesn't increase by at least 1% → Generation will stop (no progress)
- Only continue if percentage increases by 1% or more
```
**KPI Validation Logic:**
```python
def validateKpiProgression(
accumulationState: JsonAccumulationState,
currentKpi: int
) -> bool:
"""
Validate KPI progression from AI response.
Validation rules:
- If % goes DOWN → Error (e.g., no data received, started new) → Return False
- If % doesn't move (increment < 1%) Error (no progress) Return False
- If % goes UP (increment >= 1%) → Good progress → Return True
Args:
accumulationState: Current accumulation state (contains lastKpi)
currentKpi: Current KPI percentage from AI (integer 0-100)
Returns:
True if KPI progression is valid, False if error detected
"""
lastKpi = accumulationState.lastKpi if accumulationState.lastKpi else 0
increment = currentKpi - lastKpi
if increment < 0:
return False # Went down - error
if increment < 1:
return False # No progress - error
return True # Progress - good
```
### 3. How to handle encoding issues in string concatenation?
**Answer:** Remove problematic parts
- Detect encoding errors during concatenation
- Remove problematic characters/bytes
- Continue with cleaned string
- Acceptable to lose some data rather than fail completely
**Implementation:**
```python
def cleanEncodingIssues(jsonString: str) -> str:
"""
Remove problematic encoding parts from JSON string.
Generic approach:
- Detect encoding errors
- Remove problematic characters/bytes
- Return cleaned string
"""
try:
# Try to decode/encode to detect issues
jsonString.encode('utf-8').decode('utf-8')
return jsonString
except UnicodeError:
# Remove problematic parts
cleaned = jsonString.encode('utf-8', errors='ignore').decode('utf-8', errors='ignore')
logger.warning("Removed encoding issues from JSON string")
return cleaned
```
### 4. Should overlap detection be configurable? (Performance vs. accuracy)
**Answer:** No, automated mode only
- AI calls take 30-180 seconds (plenty of time for overlap detection)
- No performance concerns
- Always use automated overlap detection
- No configuration needed

View file

@ -0,0 +1,128 @@
# JSON String Accumulation Implementation Plan
## Modules to Modify
### 1. ✅ `datamodelAi.py` - COMPLETED
- Added `JsonAccumulationState` class with `lastKpi` field
### 2. `subJsonResponseHandling.py` - NEW FUNCTIONS NEEDED
**Location:** `poweron/gateway/modules/services/serviceAi/subJsonResponseHandling.py`
**Functions to add:**
1. `cleanEncodingIssues(jsonString: str) -> str`
- Clean encoding issues from JSON string
- Generic, works for any JSON structure
2. `mergeJsonStringsWithOverlap(accumulated: str, newFragment: str) -> str`
- Merge two JSON strings with overlap detection
- Find longest common suffix/prefix
- Remove duplicates
- Generic string-based comparison
3. `isJsonComplete(parsedJson: Dict[str, Any]) -> bool`
- Check if parsed JSON is complete
- Recursive validation of all structures
- Generic, no content-type-specific logic
4. `finalizeJson(parsedJson: Dict[str, Any]) -> Dict[str, Any]`
- Add missing closing elements
- Repair corruption
- Generic recursive approach
5. `extractKpiFromResponse(aiResponse: str) -> Optional[int]`
- Extract percentage (0-100) from AI response
- Look for patterns: "45%", "45 percent", "45"
- Validate range
6. `validateKpiProgression(accumulationState: JsonAccumulationState, currentKpi: int) -> bool`
- Validate KPI progression
- increment < 0 False (went down)
- increment < 1 False (no progress)
- increment >= 1 → True (progress)
7. `accumulateAndParseJsonFragments(accumulatedJsonString: str, newFragmentString: str, allSections: List[Dict], iteration: int) -> Tuple[str, List[Dict], bool, Optional[Dict]]`
- Main accumulation function
- Clean encoding, merge strings, parse, extract sections
- Return: (accumulatedString, sections, isComplete, parsedResult)
### 3. `mainServiceAi.py` - MODIFY EXISTING FUNCTIONS
**Location:** `poweron/gateway/modules/services/serviceAi/mainServiceAi.py`
**Changes needed:**
1. **Import JsonAccumulationState:**
```python
from modules.datamodels.datamodelAi import JsonAccumulationState
```
2. **Modify `_extractSectionsFromResponse()`:**
- Add parameter: `accumulationState: Optional[JsonAccumulationState] = None`
- Change return type to include `Optional[JsonAccumulationState]`
- Add first iteration check:
- Try to parse
- If complete → return sections, True, parsed, None
- If incomplete → create JsonAccumulationState, return [], False, None, state
- For subsequent iterations:
- If accumulationState exists → call `accumulateAndParseJsonFragments()`
- Update accumulationState object
- Return updated state
3. **Modify iteration loop (around line 200-350):**
- Add: `accumulationState = None` before loop
- Modify `_extractSectionsFromResponse()` call to pass and receive accumulationState
- After AI call, extract KPI from response:
```python
if accumulationState and accumulationState.isAccumulationMode:
currentKpi = JsonResponseHandler.extractKpiFromResponse(result)
if currentKpi is not None:
if not JsonResponseHandler.validateKpiProgression(accumulationState, currentKpi):
logger.warning(f"Iteration {iteration}: KPI validation failed, stopping")
break
accumulationState.lastKpi = currentKpi
```
- Update continuation context building to include KPI question
### 4. `jsonUtils.py` - MODIFY EXISTING FUNCTION
**Location:** `poweron/gateway/modules/shared/jsonUtils.py`
**Changes needed:**
1. **Modify `buildContinuationContext()`:**
- Change truncation from 100+100 to 10+10 items (line 722-727)
- Add `kpiQuestion` to context dict:
```python
context["kpiQuestion"] = "Based on the delivered data so far, approximately what percentage (%) of the total required content has been delivered? Respond with an integer between 0-100."
```
### 5. `subPromptBuilderGeneration.py` - MODIFY EXISTING FUNCTION
**Location:** `poweron/gateway/modules/services/serviceGeneration/subPromptBuilderGeneration.py`
**Changes needed:**
1. **Modify `buildGenerationPrompt()`:**
- Check for `kpiQuestion` in continuationContext
- Add KPI question to continuation prompt if present:
```python
if continuationContext and continuationContext.get("kpiQuestion"):
continuationText += f"\n\n=== PROGRESS INDICATOR ===\n{continuationContext['kpiQuestion']}\n\n⚠ IMPORTANT:\n- If percentage goes DOWN in next iteration → Generation will stop (error detected)\n- If percentage doesn't increase by at least 1% → Generation will stop (no progress)\n- Only continue if percentage increases by 1% or more\n"
```
## Implementation Order
1. ✅ Add JsonAccumulationState to datamodelAi.py (DONE)
2. Add helper functions to subJsonResponseHandling.py
3. Add main function accumulateAndParseJsonFragments to subJsonResponseHandling.py
4. Modify _extractSectionsFromResponse in mainServiceAi.py
5. Modify iteration loop in mainServiceAi.py
6. Update buildContinuationContext in jsonUtils.py
7. Update buildGenerationPrompt in subPromptBuilderGeneration.py
## Testing Considerations
- Test complete JSON on first iteration (should NOT accumulate)
- Test incomplete JSON on first iteration (should start accumulation)
- Test string accumulation with overlaps
- Test encoding cleanup
- Test KPI extraction and validation
- Test repair failure handling (should continue with previous data)

View file

@ -0,0 +1,100 @@
# JSON String Accumulation KPI Adaptation Plan
## Changes Required
### 1. Remove Current KPI Approach
- Remove KPI question from `buildContinuationContext()` in `jsonUtils.py`
- Remove KPI question display from `buildGenerationPrompt()` in `subPromptBuilderGeneration.py`
- Remove `extractKpiFromResponse()` and `validateKpiProgression()` from `subJsonResponseHandling.py` (or adapt them)
### 2. Add Separate AI Call for KPI Definition
**When:** After detecting incomplete JSON on iteration 1, BEFORE entering accumulation mode
**Input to AI:**
- User prompt
- Delivered data summary
- Last complete element
- Cut-off element
- Parsed JSON structure (if available)
**AI Task:** Analyze the prompt and delivered data to define:
- Which JSON items to track (e.g., "items in bullet_list", "rows in table", "elements in array")
- Target values for each tracked item
- JSON paths/selectors to extract values
**Output Format:**
```json
{
"kpis": [
{
"id": "prime_numbers_count",
"description": "Number of prime numbers in the list",
"jsonPath": "sections[0].elements[0].items",
"targetValue": 4000,
"currentValue": 0
},
{
"id": "table_rows",
"description": "Rows in the data table",
"jsonPath": "sections[1].elements[0].rows",
"targetValue": 100,
"currentValue": 0
}
]
}
```
### 3. Update JsonAccumulationState
**Replace:**
- `lastKpi: Optional[int]`
**With:**
- `kpiDefinitions: List[Dict[str, Any]]` - KPI definitions from AI call
- `lastKpiValues: Dict[str, int]` - Last values for each KPI (keyed by KPI id)
### 4. Add KPI Extraction Function
**Function:** `extractKpiValuesFromJson(parsedJson: Dict, kpiDefinitions: List[Dict]) -> Dict[str, int]`
- Extract current values from parsed JSON using JSON paths
- Return dict: `{kpi_id: current_value}`
### 5. Update KPI Validation Logic
**New Function:** `validateKpiProgression(accumulationState: JsonAccumulationState, currentKpiValues: Dict[str, int]) -> Tuple[bool, str]`
- Returns: `(shouldProceed, reason)`
- Logic:
- Extract current values from parsed JSON
- Compare with last values
- **Proceed if:** At least ONE KPI increased
- **Stop if:** Any KPI went backwards → return (False, "KPI went backwards")
- **Stop if:** No KPIs progressed → return (False, "No progress")
- **Finish if:** All KPIs completed OR JSON is complete → return (True, "Complete")
### 6. Update Flow in mainServiceAi.py
**Iteration 1:**
1. Try to parse JSON
2. If complete → done (no accumulation)
3. If incomplete:
- Extract sections from partial JSON
- Build continuation context
- **NEW: Make separate AI call to define KPIs**
- Create accumulationState with KPI definitions
- Enter accumulation mode
**Subsequent Iterations:**
1. Accumulate JSON strings
2. Parse accumulated JSON
3. Extract current KPI values from parsed JSON
4. Validate KPI progression
5. Update accumulationState.lastKpiValues
6. Continue or stop based on validation
## Implementation Steps
1. Update `JsonAccumulationState` in `datamodelAi.py`
2. Remove KPI question from `jsonUtils.py` and `subPromptBuilderGeneration.py`
3. Create `defineKpisFromPrompt()` function in `subJsonResponseHandling.py` (or new module)
4. Create `extractKpiValuesFromJson()` function in `subJsonResponseHandling.py`
5. Update `validateKpiProgression()` in `subJsonResponseHandling.py`
6. Update `_extractSectionsFromResponse()` in `mainServiceAi.py` to call KPI definition AI
7. Update iteration loop to extract and validate KPIs

514
appdoc/loop_plan.md Normal file
View file

@ -0,0 +1,514 @@
# Refactoring Plan: Integrate Intent Analysis into Existing Prompts and Simplify AI Loop Mode
## Overview
**Integrate** IntentAnalyzer logic into existing prompts (userintention, taskplan, dynamic) instead of making separate AI calls (saves 3 AI calls). Simplify AI loop logic to: **Complete JSON = Stop, Cut-off JSON = Continue**. Remove Definition of Done (DoD) logic entirely.
**Key Change**: Keep intent analysis logic (dataType, expectedFormats, qualityRequirements, etc.) but merge it into existing prompts instead of separate calls.
**Overlap Analysis**:
- **UserIntention prompt** already does: language detection, normalization, intent extraction
- **Intent Analysis prompt** does: primaryGoal, dataType, expectedFormats, qualityRequirements, successCriteria, language detection
- **Overlap**: Language detection (both do it)
- **Solution**: Merge intent analysis fields into userintention prompt (one call instead of two)
## Changes Required
### Phase 1: Integrate Intent Analysis into Existing Prompts
#### 1. Update UserIntention Prompt (Integrate Intent Analysis)
- **File**: `gateway/modules/workflows/workflowManager.py`
- **Lines**: 353-378
- **Changes**:
- Merge intent analysis fields (from intentAnalyzer.py lines 2-31) into userintention prompt
- Add fields: `primaryGoal`, `dataType`, `expectedFormats`, `qualityRequirements`, `successCriteria`
- Keep existing fields: `detectedLanguage`, `normalizedRequest`, `intent`, `contextItems`
- **Integration**: Combine both prompts into one - userintention already does language detection and normalization, now also does full intent analysis
- **Note**: Adding 5 items is no problem for AI - prompt complexity is acceptable
- **CRITICAL**: Intent check should be different on workflow, task, and action levels (keep separate)
- **New Prompt Structure**:
```python
analyzerPrompt = (
"You are an input analyzer. From the user's message, perform ALL of the following in one pass:\n"
"1) detectedLanguage: detect ISO 639-1 language code (e.g., de, en).\n"
"2) normalizedRequest: full, explicit restatement of the user's request in the detected language; do NOT summarize; preserve ALL constraints and details.\n"
"3) intent: concise single-paragraph core request in the detected language for high-level routing.\n"
"4) contextItems: supportive data blocks to attach as separate documents if significantly larger than the intent.\n"
"5) primaryGoal: The main objective the user wants to achieve.\n"
"6) dataType: What type of data/content they want (numbers|text|documents|analysis|code|unknown).\n"
"7) expectedFormats: What file format(s) they expect - provide matching file format extensions list (e.g., [\"xlsx\", \"pdf\"]). If format is unclear or not specified, use empty list [].\n"
"8) qualityRequirements: Quality requirements they have (accuracy, completeness) as {accuracyThreshold: 0.0-1.0, completenessThreshold: 0.0-1.0}.\n"
"9) successCriteria: Specific success criteria that define completion (array of strings).\n\n"
"Rules:\n"
"- If total content (intent + data) is < 10% of model max tokens, do not extract; return empty contextItems and keep intent compact and self-contained.\n"
"- If content exceeds that threshold, move bulky parts into contextItems; keep intent short and clear.\n"
"- Preserve critical references (URLs, filenames) in intent.\n"
"- Normalize to the primary detected language if mixed-language.\n\n"
"Return ONLY JSON (no markdown) with this shape:\n"
"{\n"
" \"detectedLanguage\": \"de|en|fr|it|...\",\n"
" \"normalizedRequest\": \"Full explicit instruction in detected language\",\n"
" \"intent\": \"Concise normalized request...\",\n"
" \"contextItems\": [...],\n"
" \"primaryGoal\": \"The main objective the user wants to achieve\",\n"
" \"dataType\": \"numbers|text|documents|analysis|code|unknown\",\n"
" \"expectedFormats\": [\"pdf\", \"docx\", \"xlsx\", ...],\n"
" \"qualityRequirements\": {\n"
" \"accuracyThreshold\": 0.0-1.0,\n"
" \"completenessThreshold\": 0.0-1.0\n"
" },\n"
" \"successCriteria\": [\"specific criterion 1\", \"specific criterion 2\"]\n"
"}\n\n"
f"User message:\n{self.services.utils.sanitizePromptContent(userInput.prompt, 'userinput')}"
)
```
- **Update parsing** (lines 397-402): Extract new fields and store in workflow object
- Store as `workflow._workflowIntent` for reuse
#### 2. Remove IntentAnalyzer Class and Calls
- **File**: `gateway/modules/workflows/processing/adaptive/intentAnalyzer.py`
- **Action**: Delete entire file (logic now integrated into prompts)
- **File**: `gateway/modules/workflows/processing/adaptive/__init__.py`
- **Action**: Remove `IntentAnalyzer` from exports
- **File**: `gateway/modules/workflows/processing/core/taskPlanner.py`
- **Line 56**: Remove `workflowIntent = await intentAnalyzer.analyzeUserIntent(actualUserPrompt, None)`
- **Line 60**: Use `workflowIntent` from workflow object (set in workflowManager)
- **Lines 167-173**: Use workflowIntent from workflow object for dataType/expectedFormats/qualityRequirements
- **File**: `gateway/modules/workflows/processing/modes/modeDynamic.py`
- **Line 36**: Remove `self.intentAnalyzer = IntentAnalyzer(services)`
- **Line 66**: Use `workflow._workflowIntent` from workflow object (already set)
- **Line 72**: Remove `self.taskIntent = await self.intentAnalyzer.analyzeUserIntent(taskStep.objective, context)`
- **Line 362**: Remove `actionIntent = await self.intentAnalyzer.analyzeUserIntent(actionObjective, context)`
- **Lines 60-67**: Use existing workflowIntent from workflow object
- **Lines 359-373**: Remove actionIntent creation logic (will be integrated into dynamic prompts)
#### 3. Remove IntentAnalyzer Imports
- **File**: `taskPlanner.py` (line 12): Remove `from modules.workflows.processing.adaptive import IntentAnalyzer`
- **File**: `modeDynamic.py` (line 25): Remove `IntentAnalyzer` from imports
### Phase 2: Integrate Information Gathering into Existing Prompts
#### 4. Update Taskplan Prompt (Use Workflow Intent from UserIntention, Allow Override)
- **File**: `gateway/modules/workflows/processing/shared/promptGenerationTaskplan.py`
- **Changes**:
- Use `workflowIntent` from workflow object (already set in workflowManager from userintention analysis)
- Pass workflowIntent fields as context to taskplan prompt via placeholders
- **CRITICAL**: Allow taskplan to **override workflow intent** if task-specific needs differ
- Example: Workflow wants PDF, but task needs CSV for intermediate step
- Taskplan prompt can reference workflow intent but can override with task-specific values
- **If taskplan needs task-specific intent analysis, add fields to task JSON**:
```json
{
"overview": "...",
"userMessage": "...",
"tasks": [
{
"id": "task_1",
"objective": "...",
"dataType": "numbers|text|documents|analysis|code|unknown", // Inherit from workflow or task-specific
"expectedFormats": ["pdf", "docx", ...], // Inherit from workflow or task-specific
"qualityRequirements": {...}, // Inherit from workflow or task-specific
...
}
]
}
```
- Update `taskPlanner.py` to use workflowIntent from workflow object (line 60)
- Extract task-specific fields from task plan response if provided
#### 5. Update Dynamic Plan Selection Prompt (Integrate Action Intent Analysis)
- **File**: `gateway/modules/workflows/processing/shared/promptGenerationActionsDynamic.py`
- **Function**: `generateDynamicPlanSelectionPrompt`
- **Changes**:
- Integrate intent analysis into action selection prompt
- Add intent analysis instructions to prompt template
- Add to JSON response structure:
```json
{
"action": "...",
"actionObjective": "...",
"dataType": "numbers|text|documents|analysis|code|unknown", // Analyze from actionObjective
"expectedFormats": ["pdf", "docx", "xlsx", ...], // Analyze from actionObjective
"qualityRequirements": {
"accuracyThreshold": 0.0-1.0,
"completenessThreshold": 0.0-1.0
}, // Analyze from actionObjective
"successCriteria": ["specific criterion 1", ...], // Analyze from actionObjective
"userMessage": "...",
"learnings": [...],
"requiredInputDocuments": [...],
"requiredConnection": "...",
"parametersContext": "..."
}
```
- Update prompt instructions to analyze actionObjective for these fields
- Extract these fields in `modeDynamic.py` when processing selection
- Store as `workflow._actionIntent` for use in AI loop (but without DoD)
#### 6. Update Dynamic Parameters Prompt
- **File**: `gateway/modules/workflows/processing/shared/promptGenerationActionsDynamic.py`
- **Function**: `generateDynamicParametersPrompt`
- **Changes**:
- Add completion criteria description (natural language, not DoD metrics)
- Ask AI to describe what "complete" means for this action
- Example: "This action is complete when: [description]"
- Use natural language completion criteria instead of DoD metrics
### Phase 3: Simplify AI Loop Logic
#### 7. Simplify `_shouldContinueGeneration`
- **File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
- **Lines**: 904-943
- **Changes**:
- Remove DoD/KPI checking logic
- Remove `workflowIntent` parameter usage for DoD
- Remove `_analyzeTaskCompletion` call (verified: not called anywhere)
- **CRITICAL**: JSON completeness is determined by **parsing**, NOT by checking last character!
- Last character check (line 860) is **WRONG** - `}` or `]` could be by chance, JSON still incomplete
- **New Logic**:
```python
def _shouldContinueGeneration(
self,
allSections: List[Dict[str, Any]],
iteration: int,
wasJsonComplete: bool,
rawResponse: str = None
) -> bool:
"""
Determine if AI generation loop should continue.
Simple logic:
- If JSON parsing failed or incomplete → continue (needs more content)
- If JSON parses successfully and is complete → stop (all content delivered)
- Loop detection prevents infinite loops
CRITICAL: JSON completeness is determined by parsing, NOT by last character check!
Returns True if we should continue, False if AI Loop is done.
"""
if len(allSections) == 0:
return True # No sections yet, continue
# CRITERION 1: If JSON was incomplete/broken (parsing failed or incomplete) - continue to repair/complete
if not wasJsonComplete:
logger.info(f"Iteration {iteration}: JSON incomplete/broken - continuing to complete")
return True
# CRITERION 2: JSON is complete (parsed successfully) - check for loop detection
if self._isStuckInLoop(allSections, iteration):
logger.warning(f"Iteration {iteration}: Detected potential infinite loop - stopping AI loop")
return False
# JSON is complete and not stuck in loop - done
logger.info(f"Iteration {iteration}: JSON complete - AI loop done")
return False
```
- Remove `userPrompt` and `workflowIntent` parameters (no longer needed)
- **Update `_extractSectionsFromResponse`**: Remove last character check (line 860), rely only on JSON parsing
#### 8. Remove `_analyzeTaskCompletion` and Check DoD Usage
- **File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
- **Lines**: 945-1090
- **Action**: Delete entire method (verified: not called anywhere in codebase)
- **Action**: Remove all references to this method
- **CRITICAL**: Check deeply in code how completeness checks are handled:
- `_refineDecide` (modeDynamic.py line 693) uses content validation and analysis
- ContentValidator checks content quality and requirements
- ProgressTracker tracks progress state
- **Verify**: DoD checking happens in refinement/validation phase, NOT in AI loop
- **Action**: Ensure validation/refinement phase still checks requirements after DoD removal
#### 9. Revise `buildContinuationContext`
- **File**: `gateway/modules/shared/jsonUtils.py`
- **Lines**: 448-1016
- **Changes**:
**A. New Summary Format** (per-section counts):
```
Following data has already been delivered:
- heading "id" <list of elements.level . elements.text>
- paragraph with <count> texts
- bullet_list with <count> items
- table "id" with <count> rows
- code_block "id" with <count> code lines
Check if with already delivered data and the last delivered data part the full response is delivered.
If not, deliver the remaining part.
```
**Rules**:
- If section has no ID, **omit it** from summary (don't show "unknown")
- If summary is too long (exceeds token limit), truncate: show **first 100 items and last 100 items** (remove middle)
**B. New Extraction Algorithm**:
1. Loop over all sections of the JSON (as it is, cut off), until a section is not complete
2. **CRITICAL**: There is always only **one section incomplete** (JSON cut-off point)
3. In the cut off section, loop through all elements, until an element is cut off
4. **Edge case**: If cut-off is in first element, just show cut-off element (no element before exists)
5. **Normal case**: Return cut-off element AND the element before it to give to the next iteration prompt
6. **CRITICAL**: In 99% of cases, JSON is cut off mid-string or mid-number - deliver the cut-off part **as-is** (don't try to "complete" it)
7. **Performance**: No problem - we only parse one AI response, not all accumulated sections
**C. Implementation**:
```python
def buildContinuationContext(allSections: List[Dict[str, Any]], lastRawResponse: Optional[str] = None) -> Dict[str, Any]:
"""
Build context information from accumulated sections for continuation prompt.
Returns summary of delivered data and cut-off point for continuation.
"""
context = {
"section_count": len(allSections),
}
# Build summary of delivered data (per-section counts)
summary_lines = []
summary_lines.append("Following data has already been delivered:\n")
for section in allSections:
section_id = section.get("id")
# CRITICAL: If section has no ID, omit it from summary
if not section_id:
continue
content_type = section.get("content_type", "")
elements = section.get("elements", [])
if isinstance(elements, list) and elements:
elem = elements[-1] if elements else {}
else:
elem = elements if isinstance(elements, dict) else {}
if isinstance(elem, dict):
if content_type == "heading":
level = elem.get("level", "")
text = elem.get("text", "")
summary_lines.append(f'- heading "{section_id}" level {level}: {text}')
elif content_type == "paragraph":
# Count text elements
text_count = sum(1 for e in (elements if isinstance(elements, list) else [elem])
if isinstance(e, dict) and e.get("text"))
summary_lines.append(f'- paragraph with {text_count} text(s)')
elif content_type in ["bullet_list", "numbered_list"]:
items = elem.get("items", [])
item_count = len(items) if isinstance(items, list) else 0
summary_lines.append(f'- bullet_list with {item_count} items')
elif content_type == "table":
rows = elem.get("rows", [])
row_count = len(rows) if isinstance(rows, list) else 0
summary_lines.append(f'- table "{section_id}" with {row_count} rows')
elif content_type == "code_block":
code = elem.get("code", "")
if code:
lines = [l for l in code.split('\n') if l.strip()]
line_count = len(lines)
summary_lines.append(f'- code_block "{section_id}" with {line_count} code lines")
# CRITICAL: If summary is too long, truncate: show first 100 and last 100 items
summary_text = "\n".join(summary_lines)
if len(summary_lines) > 200: # More than 200 lines
first_100 = "\n".join(summary_lines[:100])
last_100 = "\n".join(summary_lines[-100:])
summary_text = f"{first_100}\n... (truncated {len(summary_lines) - 200} items) ...\n{last_100}"
context["delivered_summary"] = summary_text
# Extract cut-off point using new algorithm
# 1. Loop over all sections until finding incomplete section
incomplete_section = None
incomplete_section_index = -1
for i, section in enumerate(allSections):
if self._isSectionIncomplete(section):
incomplete_section = section
incomplete_section_index = i
break
# 2. In incomplete section, loop through elements until finding cut-off element
# CRITICAL: There is always only ONE section incomplete (JSON cut-off point)
cut_off_element = None
element_before_cutoff = None
if incomplete_section:
elements = incomplete_section.get("elements", [])
if isinstance(elements, list):
for i, elem in enumerate(elements):
if self._isElementIncomplete(elem):
cut_off_element = elem
# Edge case: If cut-off is in first element, no element before exists
if i > 0:
element_before_cutoff = elements[i-1]
break
# 3. Extract from lastRawResponse if available
# CRITICAL: In 99% of cases, JSON is cut off mid-string or mid-number
# Deliver the cut-off part AS-IS (don't try to "complete" it)
if lastRawResponse and not cut_off_element:
# Try to extract cut-off element from raw response
# Extract exactly as-is, even if mid-string/mid-number
cut_off_element = self._extractCutOffElementFromRaw(lastRawResponse, incomplete_section)
context["element_before_cutoff"] = element_before_cutoff
context["cut_off_element"] = cut_off_element
return context
```
**D. Keep Existing Merge Logic**:
- Keep `_mergeSectionsIntelligently` (works well)
- Keep `_mergeSectionContent` (works well)
- Keep `_mergeCodeBlocks` (works well)
#### 10. Update `buildGenerationPrompt`
- **File**: `gateway/modules/services/serviceGeneration/subPromptBuilderGeneration.py`
- **Lines**: 48-171
- **Changes**:
- Remove DoD references (line 78)
- Update continuation prompt to use new summary format:
```python
if hasContinuation:
delivered_summary = continuationContext.get("delivered_summary", "")
element_before_cutoff = continuationContext.get("element_before_cutoff")
cut_off_element = continuationContext.get("cut_off_element")
continuationText = f"""{delivered_summary}
Check if with already delivered data and the last delivered data part the full response is delivered.
If not, deliver the remaining part.
Last complete element before cut-off: {element_before_cutoff}
Cut-off element (incomplete): {cut_off_element}
Continue from the incomplete element above - complete it first, then add NEW items."""
```
- Remove progress stats based on DoD
- Use simple section counts instead
#### 11. Clean Up `_callAiWithLooping`
- **File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
- **Lines**: 162-365
- **Changes**:
- Remove `workflowIntent` parameter usage for DoD (line 223-224)
- Remove `_analyzeTaskCompletion` call (if any)
- Simplify `_shouldContinueGeneration` call (remove workflowIntent parameter)
- Update continuation context building:
```python
continuationContext = buildContinuationContext(allSections, lastRawResponse)
# Remove taskIntent from continuationContext - no longer needed
```
### Phase 4: Clean Up References
#### 12. Remove DoD References
- **File**: `gateway/modules/workflows/processing/modes/modeDynamic.py`
- Remove DoD extraction from intents (line 365)
- Remove workflowIntent/taskIntent/actionIntent DoD usage
- **File**: `gateway/modules/services/serviceGeneration/subPromptBuilderGeneration.py`
- Remove DoD from continuation context (line 78)
- **File**: `gateway/modules/services/serviceAi/mainServiceAi.py`
- Remove all DoD-related code
#### 13. Update Workflow Intent Storage
- **File**: `modeDynamic.py`
- **Changes**:
- Remove `workflow._workflowIntent` storage (or keep empty structure if needed elsewhere)
- Remove `workflow._taskIntent` storage
- Remove `workflow._actionIntent` storage
- Or keep them but remove DoD fields
## Implementation Order
1. ✅ **Remove IntentAnalyzer class and all calls** (Phase 1)
2. ✅ **Update prompts to gather info directly** (Phase 2)
- Update UserIntention prompt (integrate intent analysis)
- Update Taskplan prompt (allow workflow intent override)
- Update Dynamic prompts (integrate action-level intent)
3. ✅ **Fix JSON completeness check** (Phase 3, Step 7)
- Remove last character check (line 860)
- Use JSON parsing only (`json.loads()`)
4. ✅ **Simplify `_shouldContinueGeneration`** (Phase 3, Step 7)
5. ✅ **Remove `_analyzeTaskCompletion`** (Phase 3, Step 8)
- Verify DoD checking in refinement phase (`_refineDecide`, ContentValidator)
6. ✅ **Revise `buildContinuationContext`** (Phase 3, Step 9)
- Implement new summary format
- Implement new extraction algorithm
- Handle edge cases (first element, mid-string/number cuts)
- Omit sections without ID
- Truncate summary if too long (first 100 + last 100)
7. ✅ **Update `buildGenerationPrompt`** (Phase 3, Step 10)
8. ✅ **Clean up `_callAiWithLooping`** (Phase 3, Step 11)
9. ✅ **Remove all DoD references** (Phase 4)
10. ✅ **Testing and validation** (Post-implementation)
- Test UserIntention prompt quality (verify all fields extracted correctly)
- Test extraction edge cases
- Test task-level intent override
- Add comprehensive unit tests
11. ✅ **Documentation** (Post-implementation)
- Update docs to explain new loop behavior
- Document JSON completeness check (parsing-based)
- Document continuation summary format
## Expected Benefits
- **Saves 3 AI calls per workflow** (workflow intent, task intent, action intent - now integrated into existing calls)
- **Simpler loop logic** (JSON completeness only, no DoD checking)
- **Clearer continuation prompts** (section counts instead of DoD metrics)
- **More precise cut-off detection** (element-level detection)
- **Better performance** (fewer AI calls = faster execution)
- **Same information gathered** (intent analysis logic preserved, just integrated)
## Testing Checklist
### Functional Tests
- [ ] Task planning works without intent analysis
- [ ] Dynamic mode action selection works without intent analysis
- [ ] Dynamic mode parameter generation works without intent analysis
- [ ] AI loop stops when JSON is complete (parsing-based check)
- [ ] AI loop continues when JSON is cut off (parsing fails)
- [ ] Continuation prompts show correct section counts
- [ ] Cut-off element extraction works correctly
- [ ] Merge logic still works correctly
- [ ] No DoD references remain in codebase
### Edge Case Tests (from Critical Analysis)
- [ ] JSON completeness: Nested structures that don't parse correctly
- [ ] JSON completeness: Mid-string cuts (e.g., `"text": "incomplete`)
- [ ] JSON completeness: Mid-number cuts (e.g., `"value": 1234`)
- [ ] Extraction: Cut-off in first element (no element before)
- [ ] Extraction: Only one incomplete section (not multiple)
- [ ] Summary: Sections without ID are omitted
- [ ] Summary: Truncation works (first 100 + last 100 if > 200 items)
- [ ] Task-level intent override works (task can override workflow intent)
### Quality Tests
- [ ] UserIntention prompt: All 9 fields extracted correctly
- [ ] UserIntention prompt: Quality maintained after adding 5 fields
- [ ] Taskplan prompt: Can override workflow intent when needed
- [ ] Dynamic prompts: Action-level intent analysis works correctly
## Notes
### Implementation Notes
- **Keep existing merge logic** - it works well and doesn't need changes
- **New extraction algorithm** focuses on finding the cut-off point more precisely
- **Summary format** provides clear progress without DoD thresholds
- **Intent analysis logic preserved** - just integrated into existing prompts instead of separate calls
- **UserIntention prompt** already does language detection and normalization - now also does full intent analysis
- **Taskplan prompt** can use workflowIntent from userintention (no re-analysis needed), but can override if task-specific needs differ
- **Dynamic prompts** integrate action-level intent analysis (no separate call needed)
- **DoD removed** - but dataType/expectedFormats/qualityRequirements still gathered for other purposes
### Critical Requirements (from Critical Analysis)
- ✅ **JSON completeness**: Use parsing (`json.loads()`), NOT last character check
- ✅ **Intent levels**: Keep separate checks for workflow, task, and action levels
- ✅ **Task-level override**: Critical requirement - allow taskplan to override workflow intent
- ✅ **Sections without ID**: Omit from summary (don't show "unknown")
- ✅ **Summary truncation**: Show first 100 and last 100 items if > 200 items
- ✅ **Extraction edge cases**: Handle first element, only one incomplete section, deliver mid-string/number as-is
- ✅ **DoD checking**: Verified in refinement phase (`_refineDecide`, ContentValidator), not in AI loop

View file

@ -0,0 +1,346 @@
# Critical Analysis: Loop Plan Refactoring
## Executive Summary
**Overall Assessment**: ✅ **The proposal is fundamentally sound and well-reasoned**, with some areas that need careful consideration. The core idea of separating JSON completeness from task completion is architecturally correct.
**Key Corrections from User**:
1. ✅ **JSON completeness**: Use **parsing** (`json.loads()`), NOT last character check (line 860 is wrong)
2. ✅ **Intent levels**: Keep separate checks for workflow, task, and action levels
3. ✅ **Task-level override**: Critical requirement - allow taskplan to override workflow intent
4. ✅ **Sections without ID**: Omit from summary (don't show "unknown")
5. ✅ **Summary truncation**: Show first 100 and last 100 items if too long
6. ✅ **Extraction algorithm**: Only one incomplete section, handle first element edge case, deliver mid-string/number as-is
7. ✅ **UserIntention prompt**: Adding 5 items is acceptable for AI
8. ✅ **DoD checking**: Verified in refinement phase (`_refineDecide`, ContentValidator)
---
## ✅ **CLEVER ASPECTS**
### 1. **Separation of Concerns: JSON Completeness vs. Task Completion**
**Rating**: ⭐⭐⭐⭐⭐ Excellent
**Why it's clever:**
- The current code already hints at this separation (line 916: "CRITICAL: This is ONLY about AI Loop Completion, NOT Action DoD!")
- **JSON completeness** = "Did the AI finish generating the JSON structure?" (technical)
- **Task completion** = "Does the content meet user requirements?" (semantic)
- These are **orthogonal concerns** - mixing them causes confusion
**Evidence from code:**
```python
# Current _shouldContinueGeneration already separates these!
# Line 916: "CRITICAL: This is ONLY about AI Loop Completion, NOT Action DoD!"
# Line 917: "Action DoD is checked AFTER the AI Loop completes in _refineDecide."
```
**Benefit**: Simpler, more predictable loop behavior. JSON completeness is objective and testable.
---
### 2. **Integrating Intent Analysis into Existing Prompts**
**Rating**: ⭐⭐⭐⭐ Very Good (with caveats)
**Why it's clever:**
- **Saves 3 AI calls** = significant performance improvement
- Reduces latency (fewer round trips)
- Reduces cost (fewer API calls)
- Information is still gathered, just more efficiently
**Potential Issues:**
- **UserIntention prompt complexity**: Adding 5 new fields (primaryGoal, dataType, expectedFormats, qualityRequirements, successCriteria) might make the prompt too complex
- **Prompt quality risk**: One prompt doing too much might reduce accuracy
- **Task-specific vs. workflow-level intent**: Task intent might differ from workflow intent (e.g., "Generate 2000 primes" vs. overall workflow goal)
**Recommendation**:
- ✅ Keep integration for workflow-level (userintention)
- ⚠️ Consider keeping task/action-level intent analysis separate OR make it optional in prompts
- Test prompt quality after integration
---
### 3. **JSON Completeness Check via Parsing**
**Rating**: ⭐⭐⭐⭐⭐ Excellent (CORRECTED)
**User Correction**: ❌ **NOT by last character check!** Last character could be `}` or `]` by chance, JSON still incomplete.
**Correct Approach**:
- **Use JSON parsing** to determine completeness
- If `json.loads()` succeeds → JSON is complete
- If `json.loads()` fails → JSON is incomplete/broken
- **Current implementation is WRONG** (line 860: `raw_normalized.endswith(('}', ']'))`)
**Why it's clever:**
- **Objective**: JSON either parses successfully or it doesn't
- **Reliable**: Handles nested structures, mid-string cuts, mid-number cuts
- **Debuggable**: Parsing errors clearly indicate where JSON is broken
**Benefit**: Predictable behavior, handles all edge cases correctly.
---
### 4. **New Continuation Summary Format**
**Rating**: ⭐⭐⭐⭐ Very Good
**Why it's clever:**
- **Clear progress indication**: "heading X with Y items" is more intuitive than "minSections: 5"
- **Contextual**: Shows what's actually been delivered, not abstract metrics
- **Actionable**: AI can see exactly what's missing
**Example format:**
```
Following data has already been delivered:
- heading "section_1" level 1: Introduction
- paragraph with 3 texts
- table "data_table" with 150 rows
- code_block "primes" with 2000 code lines
```
**User Clarifications**:
- ✅ **Missing IDs**: If section has no ID, **omit it** from summary (don't show "unknown")
- ✅ **Summary too long**: If summary exceeds token limit, truncate: show **first 100 items and last 100 items** (remove middle)
- ✅ **Format consistency**: Ensure consistent formatting across all section types
**Recommendation**:
- ✅ Implement ID check: Skip sections without ID
- ✅ Implement truncation: First 100 + last 100 items if > 200 items
---
### 5. **New Extraction Algorithm**
**Rating**: ⭐⭐⭐ Good (needs refinement)
**Why it's clever:**
- **Precise**: Finds exact cut-off point (section → element → sub-element)
- **Actionable**: Returns both cut-off element AND element before (gives AI context)
**Algorithm:**
1. Loop over all sections until finding incomplete section
2. In incomplete section, loop through elements until finding cut-off element
3. Return cut-off element AND element before it
**User Clarifications**:
- ✅ **Edge case - first element**: If cut-off is in first element, just show cut-off element (no element before exists)
- ✅ **Only one incomplete section**: There is always only **one section incomplete** (JSON cut-off point)
- ✅ **Mid-string/number cuts**: In 99% of cases, JSON is cut off mid-string or mid-number - deliver the cut-off part **as-is** (don't try to "complete" it)
- ✅ **Performance**: No problem - we only parse one AI response, not all accumulated sections
**Recommendation**:
- ✅ Handle first element edge case explicitly
- ✅ Extract cut-off element as-is (don't try to complete mid-string/number)
- ✅ Performance is fine (only parsing one response)
---
## ⚠️ **AREAS OF CONCERN**
### 1. **DoD Removal: Where Does Task Completion Checking Happen?**
**Rating**: ⚠️ Needs Clarification
**User Requirement**: Check deeply in code how completeness checks are handled
**Current State:**
- `_shouldContinueGeneration` already separates JSON completeness from DoD (line 916)
- `_analyzeTaskCompletion` exists but **verified: NOT called anywhere** in codebase
- Comment says "Action DoD is checked AFTER the AI Loop completes in _refineDecide"
- `_refineDecide` (modeDynamic.py line 693) uses:
- ContentValidator for content quality and requirements
- ProgressTracker for progress state
- Content validation and analysis (not DoD metrics)
**Findings**:
- ✅ `_analyzeTaskCompletion` is safe to remove (not called)
- ✅ DoD checking happens in refinement/validation phase (`_refineDecide`, ContentValidator)
- ✅ JSON completeness (parsing) is separate from task completion (validation)
**Recommendation**:
- ✅ Remove `_analyzeTaskCompletion` (verified safe)
- ✅ Ensure validation/refinement phase (`_refineDecide`, ContentValidator) still checks requirements
- ✅ Keep separation: JSON completeness (loop) vs. Task completion (refinement)
---
### 2. **UserIntention Prompt Complexity**
**Rating**: ⚠️ Moderate Risk
**Current**: UserIntention prompt does 4 things:
1. Language detection
2. Normalization
3. Intent extraction
4. Context item extraction
**Proposed**: Add 5 more things:
5. Primary goal
6. Data type
7. Expected formats
8. Quality requirements
9. Success criteria
**User Clarification**: ✅ **Adding 5 items is no problem for AI** - prompt complexity is acceptable
**Risk**:
- ~~Prompt bloat~~: User confirms this is acceptable
- ~~AI confusion~~: User confirms AI can handle this
- Token usage: Longer prompt = more tokens per call (acceptable trade-off)
**Mitigation Options**:
- ✅ User confirms: No mitigation needed, AI can handle it
- ✅ Keep integration as planned
**Recommendation**:
- ✅ Proceed with integration (user confirmed acceptable)
- ✅ Test after implementation to verify quality maintained
---
### 3. **Task-Specific vs. Workflow-Level Intent**
**Rating**: ⚠️ Needs Consideration
**Problem**:
- Workflow intent: "Generate comprehensive report" (high-level)
- Task intent: "Generate first 2000 prime numbers" (specific)
- Action intent: "Generate primes 1-1000" (very specific)
**Current Plan**:
- UserIntention → workflow-level intent ✅
- Taskplan → use workflow intent (no re-analysis) ⚠️
- Dynamic prompts → action-level intent analysis ✅
**Concern**:
- Task might need different dataType/expectedFormats than workflow
- Example: Workflow wants PDF, but task needs CSV for intermediate step
**User Clarification**: ✅ **YES, very important!** Allow taskplan to override workflow intent if task-specific needs differ
**Recommendation**:
- ✅ Keep workflow-level intent in userintention
- ✅ **CRITICAL**: Allow taskplan to override workflow intent (e.g., workflow wants PDF, task needs CSV)
- ✅ Keep action-level intent analysis in dynamic prompts (as planned)
- ✅ **CRITICAL**: Intent check should be different on workflow, task, and action levels (keep separate)
---
### 4. **Merge Logic Compatibility**
**Rating**: ✅ Low Risk
**Current Plan**: "Keep existing merge logic - it works well"
**Analysis**:
- ✅ Merge logic (`_mergeSectionsIntelligently`, `_mergeSectionContent`) is independent of extraction algorithm
- ✅ New extraction algorithm only affects continuation context, not merging
- ⚠️ Need to ensure extraction algorithm works with merged sections
**Recommendation**:
- ✅ Keep merge logic as-is
- ✅ Test extraction algorithm with merged sections
- ✅ Ensure extraction finds cut-off in merged structure correctly
---
### 5. **JSON Completeness Detection Edge Cases**
**Rating**: ⚠️ Needs Testing
**User Correction**: ❌ **Current check is WRONG!** `raw_normalized.endswith(('}', ']'))` is incorrect.
**Correct Approach**:
- **Use JSON parsing** (`json.loads()`) to determine completeness
- If parsing succeeds → JSON is complete
- If parsing fails → JSON is incomplete
- **Nested structures example**: `{"a": {"b": [1,2,3` should NOT pass parseJson (correctly fails)
**Current Code Issues**:
- Line 860: `raw_normalized.endswith(('}', ']'))` - **WRONG**, remove this check
- Line 864: `json.loads(extracted)` - **CORRECT**, use this as primary check
- Line 880: `except json.JSONDecodeError` - **CORRECT**, handles parsing failures
**Recommendation**:
- ✅ Remove last character check (line 860)
- ✅ Rely only on JSON parsing (`json.loads()`)
- ✅ Parsing handles all edge cases correctly (nested structures, mid-string cuts, etc.)
---
## 📊 **RISK ASSESSMENT**
| Aspect | Risk Level | Mitigation |
|--------|------------|------------|
| DoD removal | 🟢 Low | Verify checking happens in refinement phase |
| Intent integration | 🟡 Medium | Test prompt quality, have fallback plan |
| Extraction algorithm | 🟡 Medium | Handle edge cases, add tests |
| JSON completeness | 🟢 Low | Current implementation is robust |
| Merge compatibility | 🟢 Low | Merge logic is independent |
---
## 🎯 **RECOMMENDATIONS**
### High Priority
1. ✅ **Verify DoD checking location**: ✅ Confirmed in refinement phase (`_refineDecide`, ContentValidator)
2. ✅ **Test UserIntention prompt quality**: User confirmed acceptable, proceed with integration
3. ✅ **Handle extraction edge cases**: ✅ Clarified - first element (show only cut-off), only one incomplete section, deliver mid-string/number as-is
4. ✅ **Fix JSON completeness check**: ✅ Remove last character check, use parsing only
5. ✅ **Implement task-level intent override**: ✅ Critical - allow taskplan to override workflow intent
### Medium Priority
4. ✅ **Prompt splitting**: User confirmed not needed - AI can handle 5 additional fields
5. ✅ **Task-level intent override**: ✅ Critical requirement - must implement
6. ✅ **Summary truncation**: ✅ Clarified - show first 100 and last 100 items if too long
7. ✅ **Omit sections without ID**: ✅ Clarified - skip sections without ID in summary
### Low Priority
7. ✅ **Performance optimization**: Early exit in extraction algorithm if cut-off found early
8. ✅ **Add comprehensive tests**: Unit tests for all edge cases
9. ✅ **Documentation**: Update docs to explain new loop behavior
---
## 💡 **FINAL VERDICT**
**Overall**: ✅ **APPROVE WITH MODIFICATIONS**
**Strengths**:
- Clear separation of concerns (JSON completeness vs. task completion)
- Performance improvement (3 fewer AI calls)
- Simpler, more maintainable code
- Better continuation prompts (section counts vs. abstract metrics)
**Required Modifications** (UPDATED):
1. ✅ **Fix JSON completeness check**: Remove last character check, use parsing only
2. ✅ **Verify DoD checking**: Confirmed in refinement phase (`_refineDecide`, ContentValidator)
3. ✅ **UserIntention prompt**: User confirmed acceptable, proceed with integration
4. ✅ **Extraction algorithm**: Handle first element edge case, deliver mid-string/number as-is
5. ✅ **Task-level intent override**: Critical requirement - must implement
6. ✅ **Summary handling**: Omit sections without ID, truncate if too long (first 100 + last 100)
7. ✅ **Intent levels**: Keep separate checks for workflow, task, and action levels
**Risk Level**: 🟡 **Medium** - Well-reasoned proposal with manageable risks
---
## 🔍 **QUESTIONS ANSWERED**
1. ✅ **Where is `_analyzeTaskCompletion` called?****ANSWERED**: Not called anywhere, safe to remove
2. ✅ **Does refinement phase check DoD?****ANSWERED**: Yes, in `_refineDecide` via ContentValidator
3. ✅ **What happens if JSON is complete but wrong?****ANSWERED**: Validation happens in refinement phase
4. ✅ **Can tasks override workflow intent?****ANSWERED**: Yes, critical requirement - must implement
5. ✅ **What's the token limit for continuation summary?****ANSWERED**: Truncate if > 200 items (first 100 + last 100)
6. ✅ **How to check JSON completeness?****ANSWERED**: Use parsing (`json.loads()`), NOT last character check
7. ✅ **What if section has no ID?****ANSWERED**: Omit it from summary
8. ✅ **What if cut-off in first element?****ANSWERED**: Show only cut-off element (no element before)
9. ✅ **How many sections incomplete?****ANSWERED**: Always only one section incomplete
10. ✅ **What about mid-string/number cuts?****ANSWERED**: Deliver as-is (99% of cases)
---
## 📝 **CONCLUSION**
The proposal is **architecturally sound** and addresses real problems (complex DoD logic, multiple AI calls). The core idea of separating JSON completeness from task completion is **excellent design**.
Main concerns (UPDATED):
- ✅ **Prompt complexity** → User confirmed acceptable, proceed
- ✅ **Edge cases in extraction** → Clarified and addressed
- ✅ **Task-level intent** → Critical requirement, must implement override
- ✅ **JSON completeness check** → Must fix (remove last char check, use parsing)
- ✅ **Summary handling** → Clarified (omit no-ID sections, truncate if long)
**Recommendation**: ✅ **Proceed with implementation** - all concerns addressed and clarified by user.

View file

@ -0,0 +1,629 @@
# MCP (Model Context Protocol) Architecture Analysis
## MCP Overview
**Model Context Protocol (MCP)** is an open standard introduced by Anthropic (November 2024) that provides a standardized way for AI systems to interact with external tools, data sources, and systems.
### Core MCP Concepts
1. **MCP Server**: Provides capabilities (tools, resources, prompts) to AI clients
2. **MCP Client**: AI system that uses MCP servers to access external capabilities
3. **Tools**: Executable functions that the AI can call (similar to actions)
4. **Resources**: Readable data sources (files, databases, APIs)
5. **Prompts**: Pre-defined prompt templates with placeholders
### MCP Communication Model
- **Protocol**: JSON-RPC 2.0 over stdio, HTTP, or WebSocket
- **Request/Response**: Client sends requests, server responds
- **Discovery**: Client discovers available tools/resources/prompts from server
- **Execution**: Client calls tools, reads resources, uses prompts
### ✅ MCP is Model-Agnostic
**Critical Point**: MCP is **NOT** limited to Anthropic's Claude models. It is designed as an **open, model-agnostic standard** that works with:
- ✅ **Anthropic Claude** (Claude 3.5 Sonnet, Opus, Haiku)
- ✅ **OpenAI GPT** (GPT-4, GPT-4 Turbo, GPT-3.5)
- ✅ **Google Gemini** (Gemini Pro, Gemini Ultra)
- ✅ **Other LLM Providers** (any provider that supports function calling/tool use)
- ✅ **Multi-Model Systems** (systems that dynamically select models)
**How It Works**:
- MCP servers are **independent** of the AI model
- MCP clients can be implemented for **any AI provider**
- The protocol standardizes **tool/resource interfaces**, not model-specific APIs
- Your dynamic model selection can work **through** MCP, not **instead of** it
**Your Architecture Compatibility**:
- ✅ **Dynamic Model Selection**: MCP works with your per-call model selection
- ✅ **Failover Mechanism**: MCP servers don't care which model calls them
- ✅ **Model-Aware Chunking**: MCP tools receive content, model selection happens before MCP call
- ✅ **Operation Type Selection**: MCP tools can be selected based on `operationType` (same as current actions)
---
## Current Architecture vs MCP
### Current Architecture
**Structure**:
```
User Request
WorkflowProcessor
Mode (Dynamic/Actionplan)
ActionExecutor
Methods (methodAi, methodOutlook, methodSharepoint)
Actions (process, readEmails, uploadFiles)
Services (aiService, chatService, generationService)
```
**Key Characteristics**:
- **Action-based**: Actions are discovered dynamically via `@action` decorator
- **Service-oriented**: Services provide capabilities (AI, chat, generation, extraction)
- **Workflow-driven**: Sequential execution with state management
- **Type-safe**: Pydantic models for all parameters/returns
- **Two-stage planning**: Stage 1 (action selection) + Stage 2 (parameter generation)
### MCP Architecture
**Structure**:
```
AI Client (Any LLM: Claude, GPT-4, Gemini, etc.)
MCP Client Library (model-agnostic)
MCP Server (provides tools/resources/prompts)
External System (database, API, file system, etc.)
```
**Key Characteristics**:
- **Model-Agnostic**: Works with any AI provider (not just Anthropic)
- **Tool-based**: Tools are registered with MCP server
- **Resource-based**: Resources provide read-only data access
- **Prompt-based**: Pre-defined prompts with placeholders
- **Discovery-driven**: Client discovers capabilities at runtime
- **Standardized**: JSON-RPC protocol for all communication
---
## Compatibility Analysis
### ✅ Compatible Aspects
1. **Tool/Action Equivalence**:
- **MCP Tools** ≈ **Current Actions**
- Both are executable functions with parameters
- Both support discovery and execution
- Both can return results
2. **Resource/Document Equivalence**:
- **MCP Resources** ≈ **Current Document References**
- Both provide read-only data access
- Both support discovery and reading
- Both can be referenced by URI/identifier
3. **Type Safety**:
- **MCP**: Uses JSON Schema for tool parameters
- **Current**: Uses Pydantic models for action parameters
- **Compatibility**: Both provide type validation
4. **Modularity**:
- **MCP**: Servers are modular and composable
- **Current**: Methods are modular and composable
- **Compatibility**: Both support modular architecture
### ⚠️ Incompatible Aspects
1. **Execution Model**:
- **MCP**: Client-driven (AI decides which tools to call)
- **Current**: Workflow-driven (predefined action sequence)
- **Conflict**: MCP assumes AI autonomy, current system uses planning
2. **Planning vs Execution**:
- **MCP**: AI directly calls tools based on user request
- **Current**: Two-stage planning (select action → generate parameters → execute)
- **Conflict**: MCP doesn't have planning phase
3. **State Management**:
- **MCP**: Stateless tool calls
- **Current**: Stateful workflow (rounds, tasks, actions)
- **Conflict**: MCP doesn't track workflow state
4. **Service Dependencies**:
- **MCP**: Servers are independent
- **Current**: Services have dependencies (chatService → aiService)
- **Conflict**: MCP assumes flat server structure
---
## Integration Possibilities
### Option 1: MCP as Action Backend (Recommended)
**Concept**: Expose current actions as MCP tools, but keep workflow planning
**Architecture**:
```
User Request
WorkflowProcessor (planning phase - unchanged)
ActionExecutor
Model Selection (dynamic - based on operationType, priority, etc.)
├─> Select Model: GPT-4, Claude, Gemini, etc. (unchanged)
MCP Client (model-agnostic - works with any selected model)
MCP Servers (wrapping current methods as tools)
├─> MCP Server: "ai" (provides ai.process, ai.webResearch, etc.)
├─> MCP Server: "outlook" (provides outlook.readEmails, etc.)
└─> MCP Server: "sharepoint" (provides sharepoint.uploadFiles, etc.)
Methods (unchanged implementation)
```
**Benefits**:
- ✅ Keep existing workflow planning
- ✅ **Keep dynamic model selection** (MCP is model-agnostic)
- ✅ Standardize action interface (MCP tools)
- ✅ Enable external MCP servers (third-party tools)
- ✅ Maintain type safety (MCP JSON Schema ↔ Pydantic)
- ✅ **Model selection happens before MCP call** (MCP doesn't care which model)
**Implementation**:
```python
# MCP Server wrapper for methods
class MethodMcpServer:
"""MCP server that exposes method actions as tools (model-agnostic)"""
def __init__(self, method: MethodBase):
self.method = method
self.tools = self._discoverTools()
def _discoverTools(self) -> List[McpTool]:
"""Convert @action methods to MCP tools"""
tools = []
for actionName, actionInfo in self.method.actions.items():
tool = McpTool(
name=f"{self.method.name}.{actionName}",
description=actionInfo['description'],
inputSchema=self._pydanticToJsonSchema(actionInfo['parameters'])
)
tools.append(tool)
return tools
async def callTool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Execute action via MCP tool call (model-agnostic)"""
# MCP server doesn't know/care which AI model called it
methodName, actionName = name.split('.', 1)
return await self.method.actions[actionName]['method'](arguments)
# MCP Client with dynamic model selection
class McpClientWithModelSelection:
"""MCP client that supports dynamic model selection"""
def __init__(self, aiObjects: Any, mcpServers: List[MethodMcpServer]):
self.aiObjects = aiObjects # Your existing aiObjects (handles model selection)
self.mcpServers = mcpServers
async def callTool(
self,
toolName: str,
arguments: Dict[str, Any],
operationType: OperationTypeEnum,
options: AiCallOptions
) -> Any:
"""Call MCP tool with dynamic model selection"""
# 1. Select model (your existing logic - unchanged)
selectedModel = self.aiObjects.selectModel(
operationType=operationType,
priority=options.priority,
contentType=options.contentType
)
# 2. Call MCP tool (model-agnostic - works with any model)
server = self._findServerForTool(toolName)
result = await server.callTool(toolName, arguments)
# 3. Model selection and failover handled by aiObjects (unchanged)
return result
```
---
### Option 2: MCP as External Tool Integration
**Concept**: Use MCP to integrate external tools, keep internal actions as-is
**Architecture**:
```
User Request
WorkflowProcessor
ActionExecutor
├─> Internal Actions (unchanged - methodAi, methodOutlook, etc.)
└─> External MCP Tools (new - via MCP client)
├─> MCP Server: "slack" (external)
├─> MCP Server: "github" (external)
└─> MCP Server: "database" (external)
```
**Benefits**:
- ✅ Keep existing architecture unchanged
- ✅ Add external tools via MCP
- ✅ Standardized interface for external integrations
**Implementation**:
```python
# Add MCP client to ActionExecutor
class ActionExecutor:
def __init__(self, services, mcpClients: List[McpClient]):
self.services = services
self.mcpClients = mcpClients # External MCP servers
async def executeAction(self, action: str, parameters: Dict):
# Check if action is internal
if '.' in action and action.split('.')[0] in methods:
return await self._executeInternalAction(action, parameters)
# Check if action is external MCP tool
for mcpClient in self.mcpClients:
if mcpClient.hasTool(action):
return await mcpClient.callTool(action, parameters)
raise ValueError(f"Unknown action: {action}")
```
---
### Option 3: Hybrid Approach (Best of Both)
**Concept**: Internal actions remain direct, external tools via MCP, unified interface
**Architecture**:
```
User Request
WorkflowProcessor
ActionExecutor
├─> Internal Actions (direct method calls - fast, type-safe)
└─> External Tools (MCP client - standardized, extensible)
```
**Benefits**:
- ✅ Keep internal actions fast (no MCP overhead)
- ✅ Standardize external tool integration
- ✅ Unified action interface for planning
---
## Detailed Comparison
### Tool/Action Discovery
**MCP**:
```json
{
"tools": [
{
"name": "read_file",
"description": "Read a file",
"inputSchema": {
"type": "object",
"properties": {
"path": {"type": "string"}
}
}
}
]
}
```
**Current**:
```python
@action
async def process(parameters: AiProcessParameters) -> ActionResult:
"""AI processing action"""
# Action discovered via @action decorator
# Parameters: Pydantic model (AiProcessParameters)
```
**Compatibility**: ✅ **High** - Both support discovery and type validation
---
### Tool/Action Execution
**MCP**:
```json
{
"method": "tools/call",
"params": {
"name": "read_file",
"arguments": {"path": "/tmp/file.txt"}
}
}
```
**Current**:
```python
result = await executeAction(
methodName="ai",
actionName="process",
selection=ActionDefinition(
action="ai.process",
parameters={"aiPrompt": "...", "contentParts": [...]}
)
)
```
**Compatibility**: ✅ **High** - Both execute functions with parameters
---
### Resource/Document Access
**MCP**:
```json
{
"resources": [
{
"uri": "file:///tmp/document.pdf",
"name": "Document PDF",
"mimeType": "application/pdf"
}
]
}
```
**Current**:
```python
documentList = DocumentReferenceList([
DocumentListReference(label="task1_results"),
DocumentItemReference(documentId="doc_123")
])
```
**Compatibility**: ⚠️ **Medium** - Different reference models, but both provide data access
---
### Planning vs Direct Execution
**MCP**:
```
User: "Read file X and summarize it"
AI: [Discovers tools] → [Calls read_file] → [Calls summarize]
Result: Summary
```
**Current**:
```
User: "Read file X and summarize it"
Planning: [Stage 1: Select action] → [Stage 2: Generate parameters]
Execution: [Execute action with parameters]
Result: Summary
```
**Compatibility**: ⚠️ **Low** - Different execution models
---
## Integration Strategy
### Recommended: Option 1 (MCP as Action Backend)
**Why**:
1. **Standardization**: Actions become MCP tools (standard interface)
2. **Extensibility**: Can add external MCP servers easily
3. **Compatibility**: Keep existing workflow planning
4. **Type Safety**: MCP JSON Schema ↔ Pydantic models
**Implementation Steps**:
1. **Create MCP Server Wrapper**:
```python
class MethodMcpServer:
"""Wraps method actions as MCP tools"""
def __init__(self, method: MethodBase):
self.method = method
def getTools(self) -> List[McpTool]:
"""Convert @action methods to MCP tools"""
# Discover actions via @action decorator
# Convert Pydantic models to JSON Schema
# Return MCP tool definitions
```
2. **Create MCP Client in ActionExecutor**:
```python
class ActionExecutor:
def __init__(self, services, mcpServers: List[MethodMcpServer]):
self.services = services
self.mcpServers = mcpServers
async def executeAction(self, action: str, parameters: Dict):
# Find MCP server that provides this tool
server = self._findServerForAction(action)
return await server.callTool(action, parameters)
```
3. **Convert Pydantic to JSON Schema**:
```python
def pydanticToJsonSchema(model: Type[BaseModel]) -> Dict:
"""Convert Pydantic model to JSON Schema for MCP"""
# Use pydantic-to-json-schema library
return json_schema(model)
```
4. **Keep Workflow Planning**:
- Stage 1/Stage 2 planning remains unchanged
- Actions are discovered via MCP tool discovery
- Parameters validated via MCP JSON Schema
---
## Benefits of MCP Integration
### 1. Standardization
- ✅ Actions become standard MCP tools
- ✅ Consistent interface across all actions
- ✅ Standardized error handling
### 2. Extensibility
- ✅ Easy to add external MCP servers
- ✅ Third-party tools can be integrated
- ✅ No custom integration code needed
### 3. Interoperability
- ✅ Compatible with other MCP clients
- ✅ Can be used by external AI systems
- ✅ Standard protocol (JSON-RPC)
### 4. Tool Discovery
- ✅ Dynamic tool discovery
- ✅ Runtime capability detection
- ✅ No hardcoded action lists
---
## Challenges and Considerations
### 1. Planning vs Direct Execution
- **Challenge**: MCP assumes AI directly calls tools, current system uses planning
- **Solution**: Keep planning phase, use MCP for execution only
### 2. State Management
- **Challenge**: MCP is stateless, current system is stateful
- **Solution**: State management remains in WorkflowProcessor, MCP tools are stateless
### 3. Type Conversion
- **Challenge**: Pydantic models ↔ JSON Schema conversion
- **Solution**: Use existing libraries (pydantic-to-json-schema)
### 4. Performance
- **Challenge**: MCP adds JSON-RPC overhead
- **Solution**: Option 3 (hybrid) - internal actions direct, external via MCP
---
## Dynamic Model Selection with MCP
### How MCP Works with Multiple Models
**Key Insight**: MCP is **completely model-agnostic**. The protocol standardizes the **interface** between AI systems and tools, not the AI model itself.
**Your Current Architecture**:
```python
# Current: Dynamic model selection per call
selectedModel = aiObjects.selectModel(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=options.priority,
contentType=options.contentType
)
result = await aiObjects.call(request, selectedModel)
```
**With MCP**:
```python
# MCP: Model selection happens BEFORE MCP call
selectedModel = aiObjects.selectModel(
operationType=OperationTypeEnum.DATA_EXTRACT,
priority=options.priority,
contentType=options.contentType
)
# MCP tool call (model-agnostic - doesn't care which model)
mcpResult = await mcpClient.callTool(
toolName="ai.process",
arguments={"aiPrompt": "...", "contentParts": [...]},
selectedModel=selectedModel # Passed for logging/tracking, not protocol requirement
)
```
**Important Points**:
1. ✅ **MCP servers don't know which model calls them** - they're model-agnostic
2. ✅ **Model selection happens in your system** - before MCP tool call
3. ✅ **Failover works the same way** - if model fails, select next model, retry MCP call
4. ✅ **Model-aware chunking** - happens before MCP call (chunking is your system's concern)
5. ✅ **MCP just standardizes tool interface** - doesn't care about model selection logic
### Model Selection Flow with MCP
```
User Request
WorkflowProcessor
ActionExecutor
[Model Selection Logic - YOUR SYSTEM]
├─> Select model based on:
│ - operationType (DATA_EXTRACT, DOCUMENT_GENERATE, etc.)
│ - priority (high, medium, low)
│ - contentType (text, image, etc.)
│ - Model capabilities (contextLength, maxTokens)
│ - Failover chain (if first model fails)
[MCP Tool Call - MODEL-AGNOSTIC]
├─> Call MCP tool with selected model
├─> MCP server executes tool (doesn't care which model)
└─> Return result
[If Model Fails]
├─> Select next model from failover chain
└─> Retry MCP tool call with new model
```
---
## Conclusion
### Compatibility Assessment
**Overall**: ✅ **Highly Compatible** with some architectural adaptations
**Key Findings**:
1. ✅ **Actions ≈ MCP Tools**: Direct mapping possible
2. ✅ **Documents ≈ MCP Resources**: Similar concepts
3. ✅ **Dynamic Model Selection**: MCP is model-agnostic - works with any model
4. ✅ **Multi-Provider Support**: MCP works with OpenAI, Anthropic, Google, etc.
5. ⚠️ **Planning Phase**: MCP doesn't have planning, but can be kept
6. ⚠️ **State Management**: MCP is stateless, but state can remain in workflow
### Recommended Integration
**Option 1: MCP as Action Backend** (Recommended)
- Expose actions as MCP tools
- Keep workflow planning unchanged
- Enable external MCP server integration
- Maintain type safety
**Benefits**:
- Standardized action interface
- Easy external tool integration
- Compatible with MCP ecosystem
- Minimal changes to existing architecture
**Next Steps**:
1. Create MCP server wrapper for methods
2. Convert Pydantic models to JSON Schema
3. Add MCP client to ActionExecutor
4. Test with external MCP servers

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,383 @@
# MCP for Outlook/SharePoint: Can MCP Servers Replace Your Custom Actions?
## Your Question
**Can you use MCP server functionality to access Outlook emails and SharePoint data, eliminating the need for custom `methodOutlook` and `methodSharepoint` actions?**
**Answer**: ✅ **Yes, but with important considerations**
---
## How MCP Accesses Outlook/SharePoint
### MCP Server Architecture
```
Your System
↓ (MCP Client)
MCP Server (OutlookMCPServer / Microsoft 365 MCP Server)
↓ (Microsoft Graph API + OAuth)
Microsoft 365 (Outlook, SharePoint, OneDrive, Teams)
```
### Authentication Flow
**MCP servers use the same authentication as your current methods**:
1. **OAuth 2.0 Flow**:
- User grants permissions via Azure AD
- Access token obtained (same as your `connectionReference`)
- Token used for Microsoft Graph API calls
2. **Permissions Required**:
- Outlook: `Mail.ReadWrite`, `Mail.Send`, `Mail.ReadWrite.Shared`
- SharePoint: `Sites.ReadWrite.All`, `Files.ReadWrite.All`
**Key Point**: MCP servers need the **same OAuth setup** as your current methods.
---
## Current Implementation vs MCP Server
### Your Current Implementation
**`methodOutlook.py`**:
```python
class MethodOutlook(MethodBase):
def _getMicrosoftConnection(self, connectionReference: str):
# Get connection from your system
userConnection = self.services.chat.getUserConnectionFromConnectionReference(connectionReference)
# Get fresh token
token = self.services.chat.getFreshConnectionToken(userConnection.id)
return {
"accessToken": token.tokenAccess,
"refreshToken": token.tokenRefresh,
"scopes": ["Mail.ReadWrite", "Mail.Send", ...]
}
@action
async def readEmails(self, parameters: Dict) -> ActionResult:
connection = self._getMicrosoftConnection(parameters["connectionReference"])
# Direct Microsoft Graph API call
response = requests.get(
f"https://graph.microsoft.com/v1.0/me/messages",
headers={"Authorization": f"Bearer {connection['accessToken']}"}
)
# Process and return ActionResult
```
**Characteristics**:
- ✅ Uses your existing `connectionReference` system
- ✅ Integrates with your `UserConnection` management
- ✅ Returns `ActionResult` (compatible with your workflow)
- ✅ Custom actions tailored to your needs
---
### MCP Server Implementation
**OutlookMCPServer** (external MCP server):
```python
# MCP Server exposes tools like:
tools = [
{
"name": "outlook.readEmails",
"description": "Read emails from Outlook",
"inputSchema": {
"type": "object",
"properties": {
"folder": {"type": "string"},
"limit": {"type": "integer"}
}
}
},
{
"name": "outlook.sendEmail",
"description": "Send email via Outlook",
"inputSchema": {...}
}
]
# MCP Server handles authentication internally
# Uses Microsoft Graph API (same as your methods)
```
**Characteristics**:
- ✅ Standardized MCP protocol
- ✅ Pre-built tools (no custom code needed)
- ⚠️ Needs separate OAuth setup (not integrated with your `connectionReference`)
- ⚠️ Returns MCP format (needs conversion to `ActionResult`)
---
## Comparison: Custom Actions vs MCP Servers
### Scenario: Read Outlook Emails
#### Option A: Your Custom Action (Current)
```python
# In your workflow
result = await executeAction("outlook.readEmails", {
"connectionReference": "conn_msft_123", # Your connection system
"folder": "Inbox",
"limit": 10
})
# Returns: ActionResult with ActionDocument[]
# Integrated with your connection management
# Works seamlessly with your workflow
```
**Pros**:
- ✅ Integrated with your `connectionReference` system
- ✅ Uses your existing `UserConnection` management
- ✅ Returns `ActionResult` (native format)
- ✅ Customizable to your specific needs
- ✅ Token refresh handled by your system
**Cons**:
- ❌ You maintain the code
- ❌ Need to implement all actions yourself
---
#### Option B: MCP Server (Alternative)
```python
# Connect to external MCP server
outlookMcpClient = ExternalMcpClient("http://outlook-mcp-server:8080")
# Call MCP tool
mcpResult = await outlookMcpClient.callTool("outlook.readEmails", {
"folder": "Inbox",
"limit": 10
# Note: No connectionReference - MCP server handles auth internally
})
# Convert MCP result to ActionResult
result = mcpResultToActionResult(mcpResult)
```
**Pros**:
- ✅ No custom code to maintain
- ✅ Pre-built tools (readEmails, sendEmail, etc.)
- ✅ Standardized protocol
- ✅ Can use multiple MCP servers (Outlook, SharePoint, etc.)
**Cons**:
- ❌ **Separate OAuth setup** (not integrated with your `connectionReference`)
- ❌ **Different authentication flow** (MCP server manages tokens)
- ❌ Needs conversion from MCP format to `ActionResult`
- ❌ Less control over implementation
- ❌ May not match your exact requirements
---
## Critical Question: Authentication
### How Does MCP Server Access Your Data?
**MCP servers need authentication**, just like your current methods:
1. **Option 1: MCP Server Manages OAuth**
- User authenticates with MCP server
- MCP server stores tokens
- **Problem**: Separate from your `connectionReference` system
- **Problem**: User needs to authenticate twice (your system + MCP server)
2. **Option 2: Pass Tokens to MCP Server**
- Your system gets token (via `connectionReference`)
- Pass token to MCP server
- **Problem**: MCP protocol doesn't standardize token passing
- **Problem**: Security concern (passing tokens)
3. **Option 3: Custom MCP Server Using Your Connection System**
- Create MCP server wrapper around your methods
- Uses your `connectionReference` system
- **This is what the original proposal suggested** (but adds overhead)
---
## Practical Assessment
### Can You Replace Your Actions with MCP Servers?
**Short Answer**: **Technically yes, but not recommended** for these reasons:
### ❌ Problem 1: Authentication Duplication
**Your System**:
```python
# User authenticates once
userConnection = createUserConnection("msft", oauthFlow)
# Stored in your system, reusable across all actions
```
**MCP Server**:
```python
# User needs to authenticate again
# MCP server manages its own tokens
# Not integrated with your connectionReference system
```
**Impact**: Users authenticate twice, tokens managed separately
---
### ❌ Problem 2: Integration Complexity
**Your Current Flow**:
```
Workflow → ActionExecutor → methodOutlook.readEmails()
Uses connectionReference → Gets token → Calls Graph API
Returns ActionResult (native format)
```
**MCP Flow**:
```
Workflow → ActionExecutor → MCP Client → MCP Server
MCP Server manages auth → Calls Graph API
Returns MCP format → Convert to ActionResult
```
**Impact**: More layers, more complexity, more points of failure
---
### ❌ Problem 3: Loss of Control
**Your Custom Actions**:
- ✅ Full control over implementation
- ✅ Customize to your exact needs
- ✅ Integrate with your workflow seamlessly
- ✅ Use your connection management
**MCP Servers**:
- ❌ Limited to what MCP server provides
- ❌ Can't customize easily
- ❌ Need to adapt to MCP format
- ❌ Separate authentication system
---
## When MCP Servers Make Sense
### ✅ Use Case 1: External Tools You Don't Want to Maintain
**Example**: Slack, GitHub, Postgres MCP servers
```python
# Use external MCP server for Slack (you don't have Slack actions)
slackMcpClient = ExternalMcpClient("http://slack-mcp-server:8080")
result = await slackMcpClient.callTool("slack.sendMessage", {...})
```
**Value**: ✅ No need to build/maintain Slack integration
---
### ✅ Use Case 2: Standard Tools with Standard Authentication
**Example**: Public APIs with API keys (not OAuth)
```python
# Use MCP server for public API (simple API key auth)
weatherMcpClient = ExternalMcpClient("http://weather-mcp-server:8080")
result = await weatherMcpClient.callTool("weather.getForecast", {...})
```
**Value**: ✅ Simple integration, no complex auth
---
### ❌ Don't Use MCP Servers When:
1. **You already have working custom actions** (like Outlook/SharePoint)
2. **Authentication is complex** (OAuth with your connection system)
3. **You need tight integration** with your workflow
4. **You need customization** beyond what MCP server provides
---
## Recommendation
### For Outlook/SharePoint: Keep Your Custom Actions
**Why**:
1. ✅ **Already working**: Your `methodOutlook` and `methodSharepoint` work well
2. ✅ **Integrated authentication**: Uses your `connectionReference` system
3. ✅ **Native format**: Returns `ActionResult` directly
4. ✅ **Customizable**: Tailored to your specific needs
5. ✅ **No duplication**: Single authentication flow
**Don't replace with MCP servers** because:
- ❌ Would require separate OAuth setup
- ❌ Would need format conversion
- ❌ Would lose integration with your connection system
- ❌ Adds complexity without clear benefit
---
### Use MCP Servers For:
1. **New external tools** you don't want to build (Slack, GitHub, etc.)
2. **Simple integrations** with standard APIs
3. **Tools with simple authentication** (API keys, not OAuth)
---
## Alternative: Hybrid Approach
### Use MCP Servers for New Tools, Keep Custom Actions for Existing
```python
class ActionExecutor:
async def executeAction(self, methodName: str, actionName: str, parameters: Dict):
# Check if it's a custom action (Outlook, SharePoint, AI)
if methodName in ["outlook", "sharepoint", "ai"]:
# Use your custom actions (existing code)
return await self._executeCustomAction(methodName, actionName, parameters)
# Check if it's an external MCP tool
elif methodName in self.mcpClients:
# Use MCP server
mcpClient = self.mcpClients[methodName]
toolName = f"{methodName}.{actionName}"
mcpResult = await mcpClient.callTool(toolName, parameters)
return self._mcpResultToActionResult(mcpResult)
else:
raise ValueError(f"Unknown method: {methodName}")
```
**Benefits**:
- ✅ Keep existing Outlook/SharePoint actions (working, integrated)
- ✅ Use MCP servers for new external tools (Slack, GitHub, etc.)
- ✅ Best of both worlds
---
## Conclusion
**Can you use MCP servers for Outlook/SharePoint?**
- ✅ **Technically yes** - MCP servers exist and can access Outlook/SharePoint
- ❌ **Practically no** - Your custom actions are better integrated
**How does MCP access your data?**
- MCP servers use **Microsoft Graph API + OAuth** (same as your methods)
- But they need **separate OAuth setup** (not integrated with your `connectionReference`)
**Recommendation**:
- ✅ **Keep your custom Outlook/SharePoint actions** (they're better integrated)
- ✅ **Use MCP servers for new external tools** (Slack, GitHub, etc.)
- ✅ **Hybrid approach**: Custom actions for existing, MCP for new
**The real value of MCP**: Easy integration of **external tools you don't want to build**, not replacing **working custom actions**.

View file

@ -0,0 +1,290 @@
# MCP Value Analysis: When Does MCP Actually Add Value?
## Critical Question
**Does MCP enable AI calls to automatically call MCP servers, or is MCP independent of AI calls?**
**Answer**: MCP is **independent** of AI calls, but its **primary value** is when AI models can discover and call tools directly.
---
## How MCP Actually Works
### MCP's Design Purpose
MCP is designed for **direct AI-to-tool communication**:
```
AI Model (Claude Desktop, GPT-4, etc.)
↓ (discovers tools via MCP)
MCP Server (provides tools)
↓ (executes tool)
External System (database, API, file system)
```
**Key Point**: The **AI model itself** discovers and calls tools directly, not a separate planning system.
### Example: Claude Desktop with MCP
```python
# User asks Claude Desktop: "Read file X and summarize it"
# Claude Desktop (AI model):
# 1. Discovers available MCP tools via MCP protocol
tools = mcpClient.listTools()
# Returns: ["file.read", "file.write", "slack.sendMessage", ...]
# 2. AI decides to call file.read tool
result = await mcpClient.callTool("file.read", {"path": "file.txt"})
# 3. AI uses result to generate response
response = await aiModel.generate(f"Summarize: {result['content']}")
```
**The AI model directly calls tools** - no separate planning phase.
---
## Your Current Architecture vs MCP
### Your Architecture: Two-Phase Planning + Execution
```
User Request
Planning Phase (AI decides what to do)
├─> Stage 1: Select action ("ai.process")
└─> Stage 2: Generate parameters
Execution Phase (System executes, not AI)
├─> ActionExecutor.executeAction()
└─> Method.action() executes
```
**Key Difference**:
- **Your system**: AI **plans**, then **system executes**
- **MCP design**: AI **plans AND executes** directly
### The Disconnect
**MCP's value is when AI directly calls tools**, but your architecture separates planning from execution:
1. **Planning**: AI selects actions (via `_planSelect()`)
2. **Execution**: System executes actions (via `ActionExecutor`)
**MCP doesn't fit this pattern** because:
- MCP assumes AI calls tools directly
- Your system has AI plan, then system executes
- Adding MCP would just add overhead without changing the flow
---
## When MCP Actually Adds Value
### ✅ Scenario 1: External AI Systems Call Your Tools
**Use Case**: External AI systems (Claude Desktop, GPT-4, etc.) want to use your actions as tools.
**Value**: ✅ **High** - Enables external AI systems to use your capabilities
**Example**:
```python
# External Claude Desktop connects to your MCP server
# User asks Claude: "Process document X using PowerOn AI"
# Claude Desktop discovers your tools via MCP
tools = await yourMcpServer.listTools()
# Returns: ["poweron.ai.process", "poweron.outlook.readEmails", ...]
# Claude directly calls your tool
result = await yourMcpServer.callTool("poweron.ai.process", {
"aiPrompt": "Summarize document",
"documentList": ["doc_123"]
})
# Claude uses result in its response
```
**This is valuable** if you want to expose your actions to external AI systems.
---
### ✅ Scenario 2: Your AI Directly Calls Tools (No Planning Phase)
**Use Case**: Change your architecture so AI directly calls tools instead of planning first.
**Value**: ⚠️ **Medium** - Requires architectural change, but enables direct tool calling
**Example**:
```python
# Instead of planning phase, AI directly calls tools
userRequest = "Summarize document X"
# AI discovers tools
tools = mcpClient.listTools()
# AI decides and calls tool directly
result = await mcpClient.callTool("ai.process", {
"aiPrompt": "Summarize the document",
"documentList": ["doc_123"]
})
# No separate planning phase - AI does it all
```
**This would require**:
- Removing planning phase (`_planSelect`, `_actExecute`)
- Making AI directly call tools
- Significant architectural change
**Not recommended** - your planning phase provides value (two-stage parameter generation, etc.)
---
### ✅ Scenario 3: Use External MCP Servers as Tools
**Use Case**: Use external MCP servers (Slack, GitHub, Postgres) as tools in your system.
**Value**: ✅ **High** - Easy integration of external tools
**Example**:
```python
# Connect to external Slack MCP server
slackMcpClient = ExternalMcpClient("http://slack-mcp-server:8080")
# Use Slack as a tool in your actions
@action
async def sendSlackNotification(self, parameters: Dict) -> ActionResult:
# Call external MCP tool
result = await slackMcpClient.callTool("slack.sendMessage", {
"channel": parameters["channel"],
"text": parameters["text"]
})
return ActionResult(success=True, ...)
```
**This is valuable** - easy integration of external services without custom connectors.
---
### ❌ Scenario 4: Wrap Internal Actions as MCP Tools (Current Proposal)
**Use Case**: Wrap your existing actions as MCP tools, but keep same architecture.
**Value**: ❌ **Low** - Adds overhead without changing functionality
**Why it doesn't add value**:
- Your actions already work fine
- MCP adds JSON-RPC overhead
- No external systems benefit (they're internal)
- Your AI doesn't call tools directly (it plans, then system executes)
**This is what the current proposal does** - and it doesn't add real value.
---
## Honest Assessment: When to Use MCP
### ✅ Use MCP When:
1. **Exposing tools to external AI systems**
- Claude Desktop wants to use your actions
- GPT-4 wants to call your tools
- Any external AI system needs your capabilities
2. **Using external MCP servers as tools**
- Slack MCP server for notifications
- GitHub MCP server for code operations
- Postgres MCP server for database access
3. **Changing architecture to direct AI tool calling**
- Remove planning phase
- AI directly discovers and calls tools
- (But this loses your two-stage planning benefits)
### ❌ Don't Use MCP When:
1. **Just wrapping internal actions**
- Actions already work fine
- No external systems need them
- Adds overhead without benefit
2. **Keeping current planning + execution architecture**
- MCP assumes direct AI tool calling
- Your architecture separates planning from execution
- MCP doesn't fit this pattern
---
## Revised Recommendation
### Option A: External Tool Integration Only (Recommended)
**Use MCP only for external tools**, not for wrapping internal actions:
```python
# Use external MCP servers as tools
class MethodSlack(MethodBase):
def __init__(self, services):
super().__init__(services)
self.mcpClient = ExternalMcpClient("http://slack-mcp-server:8080")
@action
async def sendMessage(self, parameters: Dict) -> ActionResult:
# Call external MCP tool
result = await self.mcpClient.callTool("slack.sendMessage", parameters)
return self._mcpResultToActionResult(result)
```
**Value**: ✅ Easy integration of external services
---
### Option B: Expose Actions to External AI (If Needed)
**Only if you want external AI systems to use your actions**:
```python
# Create MCP server that external AI can connect to
mcpServer = MethodMcpServer(methodInstance)
# External AI (Claude Desktop) connects and discovers tools
tools = await mcpServer.listTools()
result = await mcpServer.callTool("poweron.ai.process", {...})
```
**Value**: ✅ Enables external AI systems to use your capabilities
---
### Option C: Skip MCP for Internal Actions (Current Recommendation)
**Don't wrap internal actions as MCP tools** - they already work fine:
```python
# Keep existing architecture
methodInstance = methods["ai"]['instance']
actionMethod = methodInstance.actions["process"]['method']
result = await actionMethod(parameters=parameters)
```
**Value**: ✅ No overhead, works perfectly
---
## Conclusion
**MCP is independent of AI calls**, but its **primary value** is when AI models can discover and call tools directly.
**For your architecture**:
- ❌ **Don't wrap internal actions** as MCP tools (adds overhead, no benefit)
- ✅ **Do use external MCP servers** as tools (easy integration)
- ✅ **Do expose actions via MCP** if external AI systems need them
**The current proposal adds overhead without real value** because:
1. Your actions already work fine
2. Your AI doesn't call tools directly (it plans, then system executes)
3. No external systems benefit from wrapping internal actions
**Better approach**: Use MCP only for external tool integration, not for wrapping internal actions.