wiki/appdoc/ai_plan_architecture.md
2026-02-04 10:13:46 +01:00

54 KiB

Enhanced AI Workflow Architecture

Overview

This document describes the enhanced architecture after implementing all refactoring improvements:

  1. Execution state in ChatWorkflow
  2. Removed SimpleNamespace workaround
  3. Structured JSON parsing
  4. Typed document references
  5. Separated document extraction from AI calls
  6. Complete Pydantic model coverage
  7. Workflow-level architecture (request → task → delivery)

Note: This document focuses on function-level architecture. For workflow-level architecture (phases, task execution, document persistence), see ai_plan.md Phase 1-6.


Architecture Principles

1. State Management

  • All execution state (currentRound, currentTask, currentAction) in ChatWorkflow object
  • No separate int parameters passed through call chains
  • Single source of truth for workflow state
  • Execution Model: Workflows run in single instance, no parallel execution (sequential task/action execution)

2. Type Safety

  • All parameters and returns use Pydantic models
  • No Dict[str, Any] or str returns
  • Compile-time validation

3. Separation of Concerns

  • Document extraction is separate action (not part of AI calls)
  • AI calls receive already-extracted ContentPart objects
  • Clear workflow: Extract → AI → Generate → Render

4. Modularity

  • Reusable extracted content
  • No tight coupling between extraction and AI
  • Clean function signatures

Enhanced Data Models

Workflow-Level Models (New)

class RequestContext(BaseModel):
    """Normalized request context from user input"""
    originalPrompt: str = Field(description="Original user prompt")
    documents: List[ChatDocument] = Field(
        default_factory=list,
        description="Documents provided by user"
    )
    userLanguage: str = Field(description="User's language")
    detectedComplexity: str = Field(
        description="Complexity level: simple, moderate, complex"
    )
    requiresDocuments: bool = Field(default=False)
    requiresWebResearch: bool = Field(default=False)
    requiresAnalysis: bool = Field(default=False)
    expectedOutputFormat: Optional[str] = Field(None)
    expectedOutputType: Optional[str] = Field(None)  # "answer", "document", "analysis"

class UnderstandingResult(BaseModel):
    """Result from initial understanding phase (combined AI call)"""
    parameters: Dict[str, Any] = Field(
        default_factory=dict,
        description="Basic parameters (language, format, detail level)"
    )
    intention: Dict[str, Any] = Field(
        default_factory=dict,
        description="User intention (primaryGoal, secondaryGoals, intentionType)"
    )
    context: Dict[str, Any] = Field(
        default_factory=dict,
        description="Extracted context (topics, requirements, constraints)"
    )
    documentReferences: List[Dict[str, Any]] = Field(
        default_factory=list,
        description="Document references with purpose and relevance"
    )
    tasks: List[TaskDefinition] = Field(
        default_factory=list,
        description="Task definitions with deliverables"
    )

class TaskDefinition(BaseModel):
    """Task definition from understanding phase"""
    id: str = Field(description="Task identifier")
    objective: str = Field(description="Task objective")
    deliverable: Dict[str, Any] = Field(
        description="Deliverable specification (type, format, style, detailLevel)"
    )
    requiresWebResearch: bool = Field(default=False)
    requiresDocumentAnalysis: bool = Field(default=False)
    requiresContentGeneration: bool = Field(default=True)
    requiredDocuments: List[str] = Field(
        default_factory=list,
        description="Document references needed for this task"
    )
    extractionOptions: Optional[ExtractionOptions] = Field(
        None,
        description="Extraction options for document processing (determined dynamically based on task and document characteristics)"
    )

class TaskResult(BaseModel):
    """Result from task execution"""
    taskId: str = Field(description="Task identifier")
    actionResult: ActionResult = Field(description="ActionResult from task execution")

Note: These models support the workflow-level architecture described in ai_plan.md Phase 1-6.


ChatWorkflow (Enhanced)

class ChatWorkflow(BaseModel):
    # ... existing fields ...
    
    # Execution state (NEW)
    # A workflow has rounds of activity. Each user input initiates a new round.
    currentRound: int = 0          # Current round (each user input = new round)
    currentTask: int = 0            # Current task within current round
    currentAction: int = 0          # Current action within current task
    
    def getRoundIndex(self) -> int:
        return self.currentRound
    
    def getTaskIndex(self) -> int:
        return self.currentTask
    
    def getActionIndex(self) -> int:
        return self.currentAction
    
    def incrementRound(self):
        """Increment round when new user input received"""
        self.currentRound += 1
        self.currentTask = 0
        self.currentAction = 0
    
    def incrementTask(self):
        """Increment task when starting new task in current round"""
        self.currentTask += 1
        self.currentAction = 0
    
    def incrementAction(self):
        """Increment action when executing new action in current task"""
        self.currentAction += 1

TaskContext (Enhanced)

class TaskContext(BaseModel):
    # ... existing fields ...
    
    # Stage 2 context fields (NEW)
    actionObjective: Optional[str] = None
    parametersContext: Optional[str] = None
    learnings: List[str] = Field(default_factory=list)
    stage1Selection: Optional[Dict[str, Any]] = None
    
    def updateFromSelection(self, selection: ActionDefinition):
        """Update context from Stage 1 selection"""
        self.actionObjective = selection.actionObjective
        self.parametersContext = selection.parametersContext
        self.learnings = selection.learnings
        self.stage1Selection = selection.model_dump()

Document References (New)

class DocumentReference(BaseModel):
    """Base class for document references"""
    pass

class DocumentListReference(DocumentReference):
    """Reference to a document list via message label"""
    messageId: Optional[str] = None
    label: str
    
    def to_string(self) -> str:
        if self.messageId:
            return f"docList:{self.messageId}:{self.label}"
        return f"docList:{self.label}"

class DocumentItemReference(DocumentReference):
    """Reference to a specific document item"""
    documentId: str
    fileName: Optional[str] = None
    
    def to_string(self) -> str:
        if self.fileName:
            return f"docItem:{self.documentId}:{self.fileName}"
        return f"docItem:{self.documentId}"

class DocumentReferenceList(BaseModel):
    """List of document references"""
    references: List[DocumentReference]
    
    def to_string_list(self) -> List[str]:
        return [ref.to_string() for ref in self.references]
    
    @classmethod
    def from_string_list(cls, stringList: List[str]) -> "DocumentReferenceList":
        """Parse string list to typed references"""
        # ... parsing logic ...

Response Models (New)

class ActionDefinition(BaseModel):
    """Action definition with selection and parameters from planning phase"""
    # Core action selection (Stage 1)
    action: str = Field(description="Compound action name (method.action)")
    actionObjective: str = Field(description="Objective for this action")
    parametersContext: Optional[str] = Field(
        None,
        description="Context for parameter generation"
    )
    learnings: List[str] = Field(
        default_factory=list,
        description="Learnings from previous actions"
    )
    
    # Resources (ALWAYS defined in Stage 1 if action needs them)
    documentList: Optional[DocumentReferenceList] = Field(
        None,
        description="Document references (ALWAYS defined in Stage 1 if action needs documents)"
    )
    connectionReference: Optional[str] = Field(
        None,
        description="Connection reference (ALWAYS defined in Stage 1 if action needs connection)"
    )
    
    # Parameters (may be defined in Stage 1 OR Stage 2, depending on action and actionObjective)
    parameters: Optional[Dict[str, Any]] = Field(
        None,
        description="Action-specific parameters (generated in Stage 2 for complex actions, or inferred from actionObjective for simple actions)"
    )
    
    def hasParameters(self) -> bool:
        """Check if parameters have been generated (Stage 2 complete or inferred)"""
        return self.parameters is not None
    
    def needsStage2(self) -> bool:
        """Determine if Stage 2 parameter generation is needed (generic, deterministic check)
        
        Generic logic (works for any action, dynamically added or removed):
        - If parameters are already set → Stage 2 not needed
        - If parameters are None → Stage 2 needed (to generate parameters from actionObjective and context)
        
        Note: Stage 1 always defines documentList and connectionReference if the action needs them.
        Stage 2 only generates the action-specific parameters dictionary.
        """
        # Generic check: if parameters are not set, Stage 2 is needed
        return self.parameters is None
    

class AiResponseMetadata(BaseModel):
    """Metadata for AI response (varies by operation type)."""
    # Document Generation Metadata
    title: Optional[str] = Field(None, description="Document title")
    filename: Optional[str] = Field(None, description="Document filename")
    
    # Operation-Specific Metadata
    operationType: Optional[str] = Field(None, description="Type of operation performed")
    schema: Optional[str] = Field(None, description="Schema version (e.g., 'parameters_v1')")
    extractionMethod: Optional[str] = Field(None, description="Method used for extraction")
    sourceDocuments: Optional[List[str]] = Field(None, description="Source document references")
    
    # Additional metadata (for extensibility)
    additionalData: Optional[Dict[str, Any]] = Field(None, description="Additional operation-specific metadata")
    
    @classmethod
    def fromDict(cls, data: Optional[Dict[str, Any]]) -> Optional["AiResponseMetadata"]:
        """Create AiResponseMetadata from dict."""

class AiResponse(BaseModel):
    """Unified response from all AI calls (planning, text, documents)"""
    content: str = Field(description="Response content (JSON string for planning, text for analysis, unified JSON for documents)")
    metadata: Optional[AiResponseMetadata] = Field(
        None,
        description="Response metadata (varies by operation type)"
    )
    documents: Optional[List[DocumentData]] = Field(
        None,
        description="Generated documents (only for document generation operations)"
    )
    
    def toJson(self) -> Dict[str, Any]:
        """
        Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods.
        Centralizes AI result to JSON conversion in one place.
        
        Uses methods from jsonUtils:
        - tryParseJson() - Safe parsing with error handling
        - repairBrokenJson() - Repairs broken/incomplete JSON
        - extractJsonString() - Extracts JSON from text with code fences
        
        Returns:
            Dict containing the parsed JSON content, or a safe fallback structure if parsing fails.
            - If content is valid JSON dict: returns the dict directly
            - If content is valid JSON list: wraps in {"data": [...]}
            - If content is broken JSON: attempts repair using repairBrokenJson()
            - If all parsing fails: returns {"content": "...", "parseError": True}
        """

class DocumentData(BaseModel):
    """Single document in response"""
    documentName: str
    documentData: Any  # Can be str, bytes, dict, etc.
    mimeType: str

class AiProcessParameters(BaseModel):
    """Parameters for AI processing action.
    
    This model is defined together with the `methodAi.process()` action function.
    All action parameter models follow this pattern: defined in the same module as the action.
    """
    aiPrompt: str = Field(description="AI instruction prompt")
    contentParts: Optional[List[ContentPart]] = Field(
        None,
        description="Already-extracted content parts (required if documents need to be processed)"
    )
    resultType: str = Field(
        default="txt",
        description="Output file extension (txt, json, pdf, docx, xlsx, etc.)"
    )

Action Registry Pattern: Each action function is defined together with its parameter Pydantic model in the same module. The action registry automatically discovers both the action function and its parameter model. This ensures:

  • Type safety: Parameters are validated at runtime
  • Co-location: Action and its parameters are defined together
  • Extensibility: New actions can be added by creating new methods with parameter models

Metadata Model:

The AiResponseMetadata Pydantic model provides type-safe access to metadata fields:

  1. Document Generation Metadata:

    • title: Optional[str] - Document title
    • filename: Optional[str] - Document filename
  2. Operation-Specific Metadata:

    • operationType: Optional[str] - Type of operation performed
    • schema: Optional[str] - Schema version (e.g., "parameters_v1")
    • extractionMethod: Optional[str] - Method used for extraction
    • sourceDocuments: Optional[List[str]] - Source document references
  3. Extensibility:

    • additionalData: Optional[Dict[str, Any]] - Additional operation-specific metadata

Note: AI Processing Metadata (aiResult, chunk, bytesSent, bytesReceived) is stored in ContentPart metadata, not in AiResponse metadata. Model name, price, and processing time are tracked separately (e.g., in ChatStat) and not included in response metadata.

Usage:

  • Use AiResponseMetadata.fromDict(dict) to create from dict with camelCase field names
  • All field names must be in camelCase format

Workflow-Level Architecture

High-Level Flow

User Request (prompt + documents)
  ↓
Request Reception → RequestContext
  ↓
Complexity Detection (AI-based semantic) → "simple" | "moderate" | "complex"
  ↓
[If simple] Fast Path
  ├─> fastPathExecute() → ActionResult
  └─> Return to user (5-15s)
  ↓
[If complex] Full Workflow
  ├─> initialUnderstanding() → UnderstandingResult
  ├─> Create TaskDefinition[] from UnderstandingResult
  ├─> For each task:
  │   ├─> executeTask(TaskDefinition) → TaskResult
  │   │   ├─> Web Research (if needed)
  │   │   ├─> Document Extraction (separate action)
  │   │   ├─> Information Analysis (AI with extracted content)
  │   │   ├─> Content Generation (AI with extracted content)
  │   │   └─> Format Rendering (unified JSON → format)
  │   └─> persistTaskResult() → ChatMessage
  └─> Return to user (30-120s)

Key Workflow Functions (see ai_plan.md for detailed implementation):

  1. Request Reception:

    • receiveRequest(prompt, documents)RequestContext
    • Normalizes user input
  2. Complexity Detection:

    • detectComplexity(prompt, documents)str ("simple" | "moderate" | "complex")
    • AI-based semantic understanding (language-agnostic)
  3. Fast Path (simple requests):

    • fastPathExecute(context)ActionResult
    • Single AI call: understand + execute + deliver
  4. Initial Understanding (complex requests):

    • initialUnderstanding(context)UnderstandingResult
    • Combined AI call: parameters + intention + context + tasks
  5. Task Execution:

    • executeTask(task: TaskDefinition)TaskResult
    • Structured sequence: extract → analyze → generate → render
  6. Document Persistence:

    • persistTaskResult(taskResult)ChatMessage
    • Persists documents for cross-task/round references

Document Persistence Strategy:

  • Within running task: ActionResult + ActionDocument (NO ChatDocuments, documents passed directly)
  • Between tasks/rounds: Persist via ChatMessage + ChatDocuments (required for docList: references)
  • User delivery: Full ChatMessage (user language) + ChatDocuments
  • Process automation: Minimal ChatMessage (no user text, just document storage) + ChatDocuments

Enhanced Function Call Reference

Entry Point: WorkflowProcessor

workflowProcessor.executeTask()

Signature (ENHANCED):

async def executeTask(
    taskStep: TaskStep,           # Pydantic: TaskStep
    workflow: ChatWorkflow,       # Pydantic: ChatWorkflow (contains execution state)
    context: TaskContext          # Pydantic: TaskContext
) -> TaskResult                   # Pydantic: TaskResult

Characteristics:

  • Execution state managed in workflow object
  • Clean signature with minimal parameters

Calls:

  1. workflow.getRoundIndex() - Get round index from workflow
  2. workflow.getTaskIndex() - Get task index from workflow
  3. self.mode.executeTask(taskStep, workflow, context) - Delegate to mode

Mode Layer: Dynamic Mode

modeDynamic.executeTask()

Signature (ENHANCED):

async def executeTask(
    taskStep: TaskStep,           # Pydantic: TaskStep
    workflow: ChatWorkflow,       # Pydantic: ChatWorkflow (contains execution state)
    context: TaskContext          # Pydantic: TaskContext (enhanced with Stage 2 fields)
) -> TaskResult                   # Pydantic: TaskResult

Characteristics:

  • Execution state managed in workflow object
  • Uses workflow.getRoundIndex(), workflow.getTaskIndex(), workflow.getActionIndex()

Calls (in sequence):

  1. workflow.getRoundIndex() - Get round index
  2. workflow.getTaskIndex() - Get task index
  3. await self._planSelect(context)ActionDefinition - Select action
  4. await self._actExecute(context, selection, taskStep, workflow)ActionResult - Execute action
  5. self._observeBuild(result)Observation - Build observation
  6. await self._refineDecide(context, observation)ReviewResult - Decide continue/stop

modeDynamic._planSelect()

Signature (ENHANCED):

async def _planSelect(
    context: TaskContext          # Pydantic: TaskContext
) -> ActionDefinition  # Pydantic: ActionDefinition

Characteristics:

  • Returns ActionDefinition model
  • Uses parseJsonWithModel() for structured parsing

Calls:

  1. generateDynamicPlanSelectionPrompt(...)PromptBundle
  2. self.services.ai.callAiPlanning(...)AiResponse
  3. parseJsonWithModel(response.content, ActionDefinition)ActionDefinition
    • Note: If Stage 1 AI returns string references, convert immediately:
      • selection.documentList = DocumentReferenceList.from_string_list(stringRefs) if stringRefs provided
      • selection.connectionReference = stringRef if stringRef provided
  4. Validate required resources: Check if action requires documents/connection and ensure they're present in selection
    • If action requires documents but selection.documentList is None → Error
    • If action requires connection but selection.connectionReference is None → Error
  5. self._validateDocumentReferences(selection.documentList, context) - Validate typed references (if documents present)

modeDynamic._actExecute()

Signature (ENHANCED):

async def _actExecute(
    context: TaskContext,                    # Pydantic: TaskContext (enhanced, no workaround)
    selection: ActionDefinition, # Pydantic: ActionDefinition (combined model)
    taskStep: TaskStep,                      # Pydantic: TaskStep
    workflow: ChatWorkflow                   # Pydantic: ChatWorkflow (contains currentRound, currentTask, currentAction)
) -> ActionResult                            # Pydantic: ActionResult

Characteristics:

  • Receives ActionDefinition model (combined action selection and parameters)
  • Uses enhanced TaskContext directly
  • Uses workflow.getActionIndex() for action index

Calls (in sequence):

  1. context.updateFromSelection(selection) - Update context from selection
  2. workflow.getActionIndex() - Get action index from workflow
  3. If needsStage2(selection) (Stage 2 needed):
    • Why this check?: Stage 1 (_planSelect) ALWAYS defines documentList (if action needs documents) and connectionReference (if action needs connection). Stage 2 is only needed for:
      • Actions that ALWAYS need Stage 2 (e.g., ai.webResearch, ai.generateReport, outlook.composeAndDraftEmailWithContext)
      • Actions where actionObjective is vague and parameters cannot be inferred
      • See action_parameters_analysis.md for complete list of which actions need Stage 2
    • generateDynamicParametersPrompt(self.services, context, ...) - Use context directly
    • self.services.ai.callAiPlanning(...)AiResponse
    • parseJsonWithModel(response.content, ActionDefinition) → Update selection with parameters
    • Note: Stage 2 generates detailed parameters but does NOT override documentList or connectionReference from Stage 1
  4. self.actionExecutor.executeSingleAction(action, workflow, taskStep) - No separate indices
    • Passes selection.documentList and selection.connectionReference to action (single set of fields)

Action Executor Layer

actionExecutor.executeSingleAction()

Signature (ENHANCED):

async def executeSingleAction(
    action: ActionItem,           # Pydantic: ActionItem
    workflow: ChatWorkflow,       # Pydantic: ChatWorkflow (contains all indices)
    taskStep: TaskStep            # Pydantic: TaskStep
) -> ActionResult                 # Pydantic: ActionResult

Characteristics:

  • Execution state managed in workflow object
  • Uses workflow.getRoundIndex(), workflow.getTaskIndex(), workflow.getActionIndex()

Calls:

  1. workflow.getRoundIndex() - Get round index
  2. workflow.getTaskIndex() - Get task index
  3. workflow.getActionIndex() - Get action index
  4. self.executeAction(methodName, actionName, parameters)ActionResult
  5. workflow.incrementAction() - Update workflow state

actionExecutor.executeAction()

Signature (ENHANCED):

async def executeAction(
    methodName: str,              # Proprietary: str (could be enum)
    actionName: str,               # Proprietary: str (could be enum)
    selection: ActionDefinition  # Pydantic: ActionDefinition (combined model)
) -> ActionResult                            # Pydantic: ActionResult

Characteristics:

  • Receives ActionDefinition model (combined action selection and parameters)
  • Type-safe parameter handling

Calls:

  1. methods[methodName] - Get method from registry
  2. method['actions'][actionName] - Get action
  3. Validate parameters against action's parameter schema (from action registry)
  4. action['method'](selection.parameters if selection.parameters else {})ActionResult - Execute action

Action Registry: Actions are registered with their parameter schemas (Pydantic models). Each action function is defined together with its parameter model in the same module (e.g., methodAi.py defines both process() function and AiProcessParameters model).

Parameter Validation: Before executing an action, parameters are validated against the action's parameter schema from the registry. Invalid parameters cause action execution to fail with a clear error message.


Method Layer: AI Method

methodAi.process()

Signature (ENHANCED):

async def process(
    parameters: AiProcessParameters  # Pydantic: AiProcessParameters
) -> ActionResult                    # Pydantic: ActionResult

Characteristics:

  • Receives AiProcessParameters model
  • Only supports contentParts (extraction must be done separately before calling this method)

Calls (in sequence):

  1. self.services.chat.progressLogStart(...) - Start progress
  2. self.services.ai.callAiContent(prompt, contentParts=parameters.contentParts, ...)AiResponse
  3. AiResponse.toJson()Dict[str, Any] - Convert response to JSON
  4. Build ActionDocument objects from response
  5. ActionResult(success=True, documents=actionDocuments)

Key Principles:

  • Extraction is separate: Documents must be extracted before calling this method using executeAction("document.extractContent", ...)
  • AI receives ContentParts: Only accepts already-extracted ContentPart objects
  • Clean separation: Extract → AI (2 separate steps)

AI Service Layer

Note: The AI service has 2 methods (not 3):

  1. callAiPlanning() - Specialized for planning tasks (static parameters, no documents)
  2. callAiContent() - Unified method for all content processing (replaces callAiDocuments() and callAiText())

The consolidation simplifies the API while maintaining clear separation between planning and content processing.

Service Dependencies:

  • Methods (e.g., methodAi, methodOutlook): Use self.services.xxx to access services (no direct dependencies)
  • Services: Have clear dependency hierarchy:
    • chatService depends on: aiService, generationService (but not vice versa)
    • aiService is independent (no dependencies on other services)
    • generationService is independent (no dependencies on other services)
    • extractionService is independent (no dependencies on other services)
  • Operation Types: Fixed enum (not configurable) - all operation types are defined in OperationTypeEnum

aiService.callAiContent()

Signature (ENHANCED):

async def callAiContent(
    prompt: str,                          # Proprietary: str
    contentParts: Optional[List[ContentPart]] = None,  # Pydantic: List[ContentPart] (required if documents need processing)
    options: AiCallOptions,                              # Pydantic: AiCallOptions (REQUIRED - operationType must be set)
    outputFormat: Optional[str] = None,                 # Proprietary: str (for document generation)
    title: Optional[str] = None                          # Proprietary: str (for document generation)
) -> AiResponse                             # Pydantic: AiResponse

Purpose: Unified method for all content processing (text, documents, images, web operations). Replaces callAiDocuments() and callAiText().

Characteristics:

  • Only receives contentParts (already-extracted content)
  • Returns unified AiResponse model
  • NO extraction logic - contentParts must be extracted before calling this method
  • REQUIRED: options.operationType must be set (no automatic detection)
  • Handles all operation types: TEXT, DOCUMENT_GENERATE, IMAGE_GENERATE, WEB_SEARCH, WEB_CRAWL

Calls (in sequence):

  1. self.services.chat.progressLogStart(...) - Start progress
  2. Validate: options.operationType must be set (required parameter)
  3. If outputFormat is specified (document generation):
    • self._preparePromptWithContentParts(prompt, contentParts)str - Build prompt
    • buildGenerationPrompt(outputFormat, prompt, title, extracted_content, ...)str - Build generation prompt
    • self._callAiWithLooping(generation_prompt, options, ...)str (JSON)
    • parseJsonWithModel(response, UnifiedJsonDocument)UnifiedJsonDocument
    • generationService.renderReport(unifiedJson, outputFormat, ...)bytes, mimeType - Render to format
    • AiResponse(content=json.dumps(unifiedJson), metadata=AiResponseMetadata(...), documents=[...])AiResponse
  4. Else (text processing, images, web operations):
    • self._preparePromptWithContentParts(prompt, contentParts)str - Build prompt
    • self._callAiWithLooping(fullPrompt, options, ...)str
    • AiResponse(content=response, metadata=None)AiResponse
  5. self.services.chat.progressLogFinish(...) - Finish progress

Key Principles:

  • No extraction: ContentParts must be extracted before calling this method
  • Handles large documents: Chunking done in extraction, batching in AI if needed
  • Type-safe return: Unified AiResponse model
  • Unified interface: Single method for all content processing (text, documents, images, web)

aiService.callAiPlanning()

Signature (ENHANCED):

async def callAiPlanning(
    prompt: str,                          # Proprietary: str
    placeholders: Optional[List[PromptPlaceholder]] = None,  # Pydantic: List[PromptPlaceholder]
    debugType: Optional[str] = None        # Proprietary: str
) -> AiResponse                             # Pydantic: AiResponse

Characteristics:

  • Returns unified AiResponse model
  • Uses parseJsonWithModel() for structured parsing
  • modelName, priceCHF, processingTime tracked separately (not in response)

Calls:

  1. self._buildPromptWithPlaceholders(prompt, placeholdersDict)str
  2. AiCallRequest(prompt=fullPrompt, context="", options=options) - Create request
  3. self.aiObjects.call(request)AiCallResponse
  4. AiResponse(content=response.content, metadata=None)AiResponse
    • Note: modelName, priceCHF, processingTime tracked separately (e.g., ChatStat)

Extraction Service Layer

Extraction Action: document.extractContent

Action Signature:

async def extractContent(
    parameters: ExtractContentParameters  # Pydantic: ExtractContentParameters
) -> ActionResult                         # Pydantic: ActionResult

Parameters Model:

class ExtractContentParameters(BaseModel):
    """Parameters for extraction action"""
    documentList: DocumentReferenceList = Field(description="Document references")
    extractionOptions: Optional[ExtractionOptions] = Field(
        None,
        description="Extraction options (dynamic, not hardcoded)"
    )

Returns:

  • ActionResult with ActionDocument containing ContentExtracted objects
  • ContentExtracted.parts contains List[ContentPart] (already chunked if needed)

Usage:

# Extraction options come from TaskDefinition (set during task creation)
extractionResult = await executeAction(
    action="document.extractContent",
    parameters=ExtractContentParameters(
        documentList=DocumentReferenceList(...),
        extractionOptions=task.extractionOptions  # From TaskDefinition
    )
)
# Extract ContentParts from result
contentParts = extractionResult.documents[0].parts  # List[ContentPart]

Extraction Options: Extraction options are part of TaskDefinition.extractionOptions and are determined dynamically based on:

  • Task requirements (from TaskDefinition)
  • Document characteristics (size, type, count)
  • Context requirements (from RequestContext)
# Extraction options are determined and set in TaskDefinition during task creation
# (before task execution begins)
task = TaskDefinition(
    id="task_1",
    objective="Analyze documents",
    extractionOptions=determineExtractionOptions(taskRequirements, documents, context)
    # extractionOptions is part of TaskDefinition model
)

# Usage in extraction action
extractionResult = await executeAction(
    action="document.extractContent",
    parameters=ExtractContentParameters(
        documentList=DocumentReferenceList(...),
        extractionOptions=task.extractionOptions  # From TaskDefinition
    )
)

Note: Extraction options are determined once per task and reused for all extraction operations in that task.

extractionService.extractContent() - Phase 1: Pure Content Extraction

Signature:

def extractContent(
    documents: List[ChatDocument],  # Pydantic: List[ChatDocument]
    options: ExtractionOptions     # Pydantic: ExtractionOptions
) -> List[ContentExtracted]        # Pydantic: List[ContentExtracted]

Purpose: Extract raw content from documents WITHOUT AI processing.

Characteristics:

  • No size limitations - Extracts complete documents regardless of size
  • No AI calls - Pure format-specific extraction (PDF, DOCX, etc.)
  • No chunking - Full content extracted as-is into ContentPart objects
  • Format-aware - Uses appropriate extractor for each document type

Key: This is Phase 1 - pure extraction. Called by the separate extraction action, not from AI calls.


extractionService.processDocumentsPerChunk() - Phase 2: AI-Based Content Extraction

Signature:

async def processDocumentsPerChunk(
    documents: List[ChatDocument],  # Pydantic: List[ChatDocument]
    prompt: str,                     # Proprietary: str
    aiObjects: Any,                  # Proprietary: Any (AiObjects interface)
    options: Optional[AiCallOptions] = None,  # Pydantic: AiCallOptions
    operationId: Optional[str] = None         # Proprietary: str
) -> str                             # Merged AI results as string

Purpose: Process extracted content with AI using intelligent model-aware chunking.

Characteristics:

  • Two-phase approach: First extracts content (Phase 1), then processes with AI (Phase 2)
  • Model-aware chunking: Chunk size calculated dynamically per model
  • No fixed sizes: Only model's contextLength and maxTokens are limits
  • Per-call model selection: Each AI call can use different model (failover)
  • Dynamic recalculation: Chunking recalculated when model changes

Calls (in sequence):

  1. self.extractContent(documents, extractionOptions)List[ContentExtracted] (Phase 1: Pure extraction, no chunking)
  2. self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId)List[PartResult]
    • For each ContentPart (Pipeline Architecture):
      • Initialize pipeline: processedChunks = [], remainingContent = part.data
      • While remainingContent:
        • Select model (based on operationType, priority, etc.)
        • Calculate chunk size for CURRENT model: model.contextLength - reservedTokens
        • Create NEXT chunk on-demand (lazy): Extract chunk from remainingContent
        • Process chunk with AI (with failover if needed)
        • Update pipeline: Add result to processedChunks, remove from remainingContent
        • If failover: Recalculate chunk size for NEW model, continue with same remainingContent
      • Merge processedChunksPartResult
  3. self._mergePartResults(partResults, options)str (Intelligent merging)

Pipeline Benefits:

  • Memory efficient - Only one chunk in memory at a time
  • On-demand creation - Chunks created only when needed
  • Model-adaptive - Chunk size adapts when model changes
  • Resumable - Pipeline state allows resuming after failures

Key: This combines Phase 1 (pure extraction) and Phase 2 (AI processing with model-aware chunking).


Chat Service Layer

chatService.getChatDocumentsFromDocumentList()

Signature (ENHANCED):

def getChatDocumentsFromDocumentList(
    documentList: DocumentReferenceList  # Pydantic: DocumentReferenceList
) -> List[ChatDocument]                 # Pydantic: List[ChatDocument]

Characteristics:

  • Receives DocumentReferenceList model
  • Type-safe document references

Calls:

  1. documentList.to_string_list() - Convert to string list
  2. Loop through references:
    • Parse DocumentListReference or DocumentItemReference
    • Search workflow.messages for matching documentsLabel
    • Extract ChatDocument objects

chatService.storeMessageWithDocuments()

Signature (ENHANCED):

async def storeMessageWithDocuments(
    workflow: ChatWorkflow,         # Pydantic: ChatWorkflow
    messageData: MessageData,       # Pydantic: MessageData
    chatDocuments: List[ChatDocument]  # Pydantic: List[ChatDocument]
) -> ChatMessage                    # Pydantic: ChatMessage

Characteristics:

  • Receives MessageData model
  • Type-safe message creation

Calls:

  1. messageData.model_dump() - Convert to dict for database
  2. self.services.interfaceDbChat.storeMessage(...) - Store message
  3. Loop through chatDocuments:
    • self.services.interfaceDbChat.storeDocument(...) - Store document

Enhanced Call Flow

Workflow Entry Point

User Request (prompt + documents)
  ↓
workflowProcessor.receiveRequest(prompt, documents, workflow)
  ├─> Create RequestContext
  ├─> detectComplexity(prompt, documents) → "simple" | "moderate" | "complex"
  ├─> [If simple] fastPathExecute(context) → ActionResult
  │   └─> Return to user (5-15s)
  └─> [If complex] Full Workflow
      ├─> initialUnderstanding(context) → UnderstandingResult
      ├─> Create TaskDefinition[] from UnderstandingResult
      └─> For each task: executeTask(task) → TaskResult

Simple Request (Fast Path)

User Request
  ↓
workflowProcessor.executeTask()
  ↓
modeDynamic.executeTask()
  ↓
_complexityDetection() → "simple"
  ↓
fastPathExecute()
  ↓
aiService.callAiContent(prompt, contentParts=None) → AiResponse
  ↓
Return TaskResult

Call Depth: 4 levels


Complex Request (Full Workflow)

High-Level Flow

User Request
  ↓
workflowProcessor.receiveRequest() → RequestContext
  ↓
initialUnderstanding(context) → UnderstandingResult
  ├─> aiService.callAiPlanning() → AiResponse (combined understanding)
  └─> parseJsonWithModel() → UnderstandingResult
  ↓
Create TaskDefinition[] from UnderstandingResult
  ↓
For each task: executeTask(TaskDefinition) → TaskResult
  ├─> [If requiresWebResearch] getWebInformation() → List[ContentPart]
  ├─> [If requiresDocumentAnalysis] executeAction("document.extractContent") → ActionResult ✅ SEPARATE
  │   └─> Get ContentParts from extractionResult.documents[0].parts
  ├─> [If webContent or extractedContent] analyzeInformation(contentParts) → List[ContentPart]
  │   └─> aiService.callAiContent(prompt, contentParts=...) → AiResponse ✅
  ├─> generateContent(contentParts) → Unified JSON
  │   └─> aiService.callAiContent(prompt, contentParts=..., outputFormat=...) → AiResponse ✅
  └─> [If format != "json"] renderFromUnifiedJson(unifiedJson, format) → bytes
  ↓
persistTaskResult(taskResult) → ChatMessage
  ├─> Create ChatDocuments from ActionDocument
  └─> storeMessageWithDocuments(workflow, messageData, chatDocuments) → ChatMessage

Action-Level Flow (Within Task)

modeDynamic.executeTask(taskStep, workflow, context)
  ↓
_planSelect(context) → ActionDefinition
  ├─> aiService.callAiPlanning() → AiResponse
  └─> parseJsonWithModel() → ActionDefinition ✅
  ↓
_actExecute(context, selection, ...) → ActionResult
  ├─> context.updateFromSelection(selection) ✅ (no SimpleNamespace)
  ├─> **If selection.needsStage2()** (Stage 2 needed - deterministic check):
  │   ├─> aiService.callAiPlanning() → AiResponse
  │   └─> parseJsonWithModel() → Update selection with parameters ✅
  └─> actionExecutor.executeSingleAction() → ActionResult
      └─> methodAi.process(parameters) → ActionResult
          └─> aiService.callAiContent(prompt, contentParts=parameters.contentParts, ...) → AiResponse ✅
          Note: contentParts must be extracted before calling this method using executeAction("document.extractContent", ...)
  ↓
_observeBuild() → Observation
  ↓
_refineDecide() → ReviewResult

Call Depth: 6-7 levels

Key Improvements:

  • Extraction is separate action (not inside AI call)
  • AI receives ContentParts (already extracted)
  • All returns are Pydantic models
  • No SimpleNamespace workaround
  • Execution state in ChatWorkflow (currentRound, currentTask, currentAction)
  • Clear hierarchy: Round → Task → Action

Document Content Extraction Architecture

Two-Phase Extraction Model

Document content extraction is split into two distinct phases with different purposes and limitations:

Phase 1: Pure Content Extraction (No AI)

Purpose: Extract raw content from documents without any AI processing.

Method: extractionService.extractContent(documents, options)

Characteristics:

  • No size limitations - Can extract complete documents regardless of size
  • No AI calls - Pure content extraction using format-specific extractors
  • Creates ContentPart objects - Structured content parts ready for processing
  • No chunking - Full document content is extracted as-is

Output: List[ContentExtracted] where each contains List[ContentPart]

Example:

# Phase 1: Pure extraction (no AI, no size limits)
extractionResult = extractionService.extractContent(
    documents=chatDocuments,
    options=ExtractionOptions(
        # No maxSize needed - extracts everything
        chunkAllowed=False  # No chunking in pure extraction
    )
)
# Returns: List[ContentExtracted] with full ContentParts

Phase 2: AI-Based Content Extraction (With Intelligent Model-Aware Chunking)

Purpose: Process extracted content with AI, using intelligent chunking based on selected AI model capabilities.

Method: extractionService.processDocumentsPerChunk(documents, prompt, aiObjects, options)

Characteristics:

  • Model-aware chunking - Chunk size calculated dynamically based on selected model
  • No fixed sizes - Only model's contextLength and maxTokens are used as limits
  • Per-call model selection - Each AI call can use a different model (failover mechanism)
  • Dynamic chunking - Chunking recalculated for each model in failover chain
  • Intelligent merging - Chunk results merged using sophisticated merging system

Chunking Calculation (per model):

# For each AI call, chunking is calculated dynamically based on:
# 1. Model's context window size (known from model configuration)
# 2. Input prompt size (known at call time)
# 3. Model's max output tokens (known from model configuration)

modelContextTokens = model.contextLength      # Model's total context window (known)
modelMaxOutputTokens = model.maxTokens        # Model's max output tokens (known)
promptSize = len(prompt.encode('utf-8'))     # Input prompt size in bytes (known at call time)

# Calculate prompt tokens (approximate: 1 token ≈ 4 bytes)
promptTokens = promptSize / 4

# Reserve tokens for:
systemMessageTokens = 10                      # System message overhead
outputTokens = modelMaxOutputTokens           # Output reservation (known from model)
messageOverheadTokens = 100                   # JSON/message structure overhead

totalReservedTokens = promptTokens + systemMessageTokens + 
                     messageOverheadTokens + outputTokens

# Calculate remaining context available for content:
remainingContextTokens = modelContextTokens - totalReservedTokens

# Available for content (80% safety margin to avoid context overflow):
availableContentTokens = int(remainingContextTokens * 0.8)
availableContentBytes = availableContentTokens * 4  # Convert back to bytes (1 token ≈ 4 bytes)

# Chunk size (70% of available for text, 80% for images):
# - Text: 70% to leave room for response formatting
# - Images: 80% as images are more compact in token representation
textChunkSize = int(availableContentBytes * 0.7)
imageChunkSize = int(availableContentBytes * 0.8)

# Key: Chunking is calculated per-call based on:
# - Model's known context window size
# - Actual prompt size at call time
# - Remaining context = contextLength - promptTokens - reservedTokens
# No fixed sizes - adapts to each specific call context

Processing Flow (Pipeline Architecture):

# Phase 2: AI processing with model-aware pipeline chunking
for each ContentPart:
    # Initialize pipeline state
    processedChunks = []  # Already processed chunks
    remainingContent = contentPart.data  # Content not yet chunked/processed
    currentModel = None
    
    while remainingContent:
        1. Model Selection: Select best model for remaining content
           - Uses: operationType, priority, content type, etc.
           - If model changed: Recalculate chunk size for NEW model
        
        2. Calculate chunk size for CURRENT model:
           - Uses: model.contextLength, model.maxTokens, prompt size
           - Calculates: availableContentBytes for THIS model
        
        3. Create NEXT chunk on-demand (lazy chunking):
           - Only chunk the next portion needed (not entire content)
           - Chunk size based on current model's capabilities
           - Extract chunk from remainingContent
        
        4. Process chunk with AI:
           - Call AI with current model and next chunk
           - Store result in processedChunks
        
        5. Update pipeline state:
           - Remove processed chunk from remainingContent
           - Mark chunk as processed
        
        6. If model fails:
           - Select next model from failover list
           - Recalculate chunk size for NEW model (may have different contextLength!)
           - Continue with new model (remainingContent unchanged)
    
    7. Merge all processed chunks:
       - Merge processedChunks using intelligent merging
       - Return merged result

Pipeline Architecture:

  • Lazy chunking - Only next chunk is created on-demand, not entire content
  • Pipeline state - Tracks processed chunks and remaining content separately
  • On-demand creation - Chunk created only when needed for processing
  • Model-aware per chunk - Each chunk size calculated for current model
  • Efficient memory usage - No need to store all chunks in memory at once

Key Principles:

  • Dynamic calculation - Chunk size calculated per call based on:
    • Model's contextLength (known)
    • Model's maxTokens (known)
    • Input prompt size (known at call time)
    • Remaining context = contextLength - promptTokens - reservedTokens
  • No fixed chunk sizes - Only model capabilities and prompt size determine chunk size
  • Per-call model selection - Each AI call can use different model
  • Dynamic recalculation - Chunking recalculated when model changes (failover)
  • Model capabilities as limits - Only contextLength and maxTokens are hard limits
  • Intelligent failover - If model fails, next model's capabilities determine new chunking
  • Pipeline processing - Only next chunk created, not entire content chunked upfront

Example (Pipeline Chunking):

# Phase 2: AI processing (with intelligent model-aware pipeline chunking)
aiResult = await extractionService.processDocumentsPerChunk(
    documents=chatDocuments,
    prompt="Extract key information",
    aiObjects=aiObjects,
    options=AiCallOptions(operationType=OperationTypeEnum.DATA_EXTRACT)
)
# Internally (Pipeline Architecture):
# - Extracts content (Phase 1) without chunking
# - For each ContentPart (e.g., 500KB text):
#   Pipeline State:
#     processedChunks = []
#     remainingContent = 500KB
#   
#   Iteration 1:
#     - Selects model: GPT-4 (128k context)
#     - Calculates chunk size: (128k - reserved) * 0.8 * 0.7 = ~70KB
#     - Creates NEXT chunk on-demand: chunk1 = 70KB from remainingContent
#     - Processes chunk1 with GPT-4
#     - processedChunks = [chunk1_result]
#     - remainingContent = 430KB
#   
#   Iteration 2:
#     - Still using GPT-4
#     - Calculates chunk size: ~70KB (same model)
#     - Creates NEXT chunk: chunk2 = 70KB from remainingContent
#     - Processes chunk2 with GPT-4
#     - processedChunks = [chunk1_result, chunk2_result]
#     - remainingContent = 360KB
#   
#   Iteration 3:
#     - GPT-4 fails (rate limit)
#     - Selects next model: Claude (200k context)
#     - Recalculates chunk size: (200k - reserved) * 0.8 * 0.7 = ~110KB
#     - Creates NEXT chunk: chunk3 = 110KB from remainingContent (larger!)
#     - Processes chunk3 with Claude
#     - processedChunks = [chunk1_result, chunk2_result, chunk3_result]
#     - remainingContent = 250KB
#   
#   Iteration 4-N:
#     - Continues with Claude, creating chunks on-demand
#     - Each chunk size based on Claude's capabilities
#   
#   Final:
#     - Merges all processedChunks
#     - Returns merged result

Benefits:

  • Optimal resource usage - Uses full model capabilities, no arbitrary limits
  • Adaptive to model changes - Automatically adapts when model changes (failover)
  • No wasted capacity - Chunking based on actual model limits, not fixed sizes
  • Robust failover - Different models can have different chunk sizes
  • Memory efficient - Only one chunk in memory at a time (pipeline architecture)
  • On-demand processing - Chunks created only when needed, not all upfront
  • Resumable - Pipeline state allows resuming after failures without re-chunking
  • Model-adaptive chunking - Chunk size recalculated per model, even mid-processing

Architecture Principles

ContentParts-Only Approach

Key Principle: All AI methods only accept contentParts (already-extracted content). Extraction must be done separately using the document.extractContent action.

Workflow:

  1. Extract documents: executeAction("document.extractContent", ...)ActionResult
  2. Get ContentParts: extractionResult.documents[0].partsList[ContentPart]
  3. Call AI: aiService.callAiContent(prompt, contentParts=contentParts, ...)AiResponse

Benefits:

  • Clear separation: Extract → AI (2 separate steps)
  • Reusable extracted content
  • No extraction logic in AI service
  • Type-safe with Pydantic models

Document Persistence Architecture

Three-Phase Document Handling

Phase 1: Within Running Task (Standardized Format)

Flow: ActionResultActionResult (direct passing)

# Action 1 returns ActionResult
action1Result = await executeAction("document.extractContent", {...})
# Returns: ActionResult with ActionDocument containing ContentExtracted

# Action 2 receives ActionResult directly
action2Result = await executeAction("ai.process", {
    "contentParts": action1Result.documents[0].parts  # Direct access
})
# NO ChatDocuments created - documents passed directly

Key: Documents are ephemeral but available within the same task.


Phase 2: Between Tasks/Rounds (Persistence Required)

Flow: ActionResultChatMessage + ChatDocuments (persistence)

# Task completion
taskResult = TaskResult(
    taskId=task.id,
    actionResult=finalActionResult  # ActionResult with ActionDocument
)

# Persist for future reference
chatMessage = await persistTaskResult(
    taskResult=taskResult,
    context=context,
    workflow=workflow
)
# Creates: ChatMessage + ChatDocuments
# Sets: documentsLabel for docList: references

Why: Later tasks need to reference documents via docList:label or docList:messageId:label.

Lookup Flow:

# Later task references documents
documentList = DocumentReferenceList.from_string_list([
    "docList:msg_123:task1_results"  # Reference to previous task
])

# System looks up ChatMessage with matching documentsLabel
chatDocuments = getChatDocumentsFromDocumentList(documentList)
# Searches workflow.messages for documentsLabel match

Phase 3: User Delivery (ChatMessage + ChatDocuments)

Flow: ActionResultChatMessage (user language) + ChatDocuments

# User-facing workflow
chatMessage = await deliverToUser(
    actionResult=actionResult,
    context=context,
    workflow=workflow,
    isUserFacing=True
)
# Creates: ChatMessage (user language) + ChatDocuments

# Process automation (minimal message)
chatMessage = await persistDocumentsForAutomation(
    actionResult=actionResult,
    context=context,
    workflow=workflow
)
# Creates: ChatMessage (no user text, just storage) + ChatDocuments

Key:

  • User-facing: Full ChatMessage with user-friendly text in user language
  • Process automation: Minimal ChatMessage (system role, no user text) but documents still persisted

Service Dependency Architecture

Service Access Pattern

Methods (e.g., methodAi, methodOutlook, methodSharepoint):

  • Access services via self.services.xxx (no direct dependencies)
  • No dependency injection needed - services are provided through services object
  • Methods are stateless and reusable

Service Dependency Hierarchy:

aiService (independent)
  └─> No dependencies

generationService (independent)
  └─> No dependencies

extractionService (independent)
  └─> No dependencies

chatService (depends on aiService, generationService)
  ├─> Uses: aiService (for AI calls)
  └─> Uses: generationService (for document rendering)

workflowProcessor (depends on all services)
  ├─> Uses: chatService
  ├─> Uses: aiService
  ├─> Uses: generationService
  └─> Uses: extractionService

Key Principles:

  • Unidirectional dependencies: Services only depend on lower-level services
  • No circular dependencies: Dependency graph is acyclic
  • Clear hierarchy: Base services (ai, generation, extraction) are independent
  • Methods are stateless: Access services through self.services object

Operation Types

Fixed Enum: Operation types are defined in OperationTypeEnum and are not configurable.

  • All operation types are known at compile time
  • Adding new operation types requires code changes (enum update)
  • This ensures type safety and prevents runtime configuration errors

Benefits Summary

Code Quality

  • Type safety: All parameters/returns are Pydantic models
  • No workarounds: Removed SimpleNamespace hack
  • Clean signatures: Fewer parameters, state in ChatWorkflow
  • Structured parsing: No manual JSON find/replace
  • Action registry: Actions defined with parameter models for validation

Architecture

  • Modular: Extraction separate from AI
  • Reusable: Extract once, use multiple times
  • Simplified: Shorter call chains (4-7 levels)
  • Maintainable: Clear separation of concerns
  • Service hierarchy: Clear dependency structure, no circular dependencies
  • Sequential execution: Single workflow instance, no parallel execution

Performance

  • Faster AI calls: No extraction overhead
  • Better caching: Reusable extracted content
  • Efficient: Dynamic chunking based on model capabilities and prompt size
  • Model-aware: Chunking adapts to each model's context window

Developer Experience

  • Better IDE support: Autocomplete, type hints
  • Error prevention: Compile-time validation, parameter validation
  • Clearer code: Typed models vs Dict/str
  • Action co-location: Action functions and parameter models defined together