wiki/appdoc/doc_dev_workflow.md

36 KiB

Workflow System - Developer Documentation

Overview

The PowerOn workflow system is a multi-layered architecture designed for intelligent task automation and document processing. The system processes user requests through a structured pipeline involving workflow orchestration, AI-powered task planning, action execution via specialized methods, and comprehensive service integration.

System Architecture

The workflow system is built on six distinct layers, each serving a specific purpose:

┌─────────────────────────────────────────────────────────────┐
│                     Workflow Layer                           │
│  (workflows/) - Orchestration & Execution Management         │
└─────────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│                    Method Layer                              │
│  (methods/) - Action Implementation & Parameters            │
└─────────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│                    Service Layer                             │
│  (services/) - Business Logic & Feature Catalog              │
└─────────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│                  Interface Layer                             │
│  (interfaces/) - Standardized Component APIs                 │
└─────────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│                   Connector Layer                             │
│  (connectors/) - External System Integration                 │
└─────────────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────────────┐
│               Datamodels & AI Core                           │
│  (datamodels/, aicore/) - Data Models & AI Orchestration     │
└─────────────────────────────────────────────────────────────┘

Layer 1: Workflow System (@workflows/)

Purpose

The workflow layer is the top-level orchestration system that manages task planning, execution, and state management. It coordinates between different workflow modes (React, Actionplan, etc.) and handles the complete workflow lifecycle.

Key Components

workflowManager.py

Entry point for workflow operations.

Key Functions:

  • workflowStart(): Initialize and start a workflow instance
  • workflowStop(): Stop an active workflow
  • _workflowProcess(): Main workflow execution loop
  • _planTasks(): Generate task plans using AI
  • _executeTasks(): Execute tasks sequentially

Workflow States:

  • running: Workflow is actively processing
  • stopped: User manually stopped the workflow
  • completed: Workflow finished successfully
  • failed: Workflow encountered an error

Example:

# Start a new workflow
workflow = await workflowManager.workflowStart(
    userInput=UserInputRequest(prompt="Analyze sales data"),
    workflowMode="React"  # or "Actionplan"
)

Workflow Types

  1. React Mode: Iterative task execution with up to 5 refinement steps

    • Best for exploratory tasks that may require multiple passes
    • Uses maxSteps: 5 configuration
  2. Actionplan Mode: Single-pass task execution with full action planning upfront

    • Best for well-defined tasks with clear objectives
    • Single execution path

Workflow Lifecycle:

start → analyzeUserInput → planTasks → executeTasks → generateResults → complete

Processing Layer (processing/)

The processing layer handles mode-specific execution logic:

  • workflowProcessor.py: Main processor delegating to mode implementations
  • modes/modeBase.py: Base class for all workflow modes
  • modes/modeReact.py: React mode implementation
  • modes/modeActionplan.py: Actionplan mode implementation

Task Execution Flow:

# Task planning
taskPlan = await processor.generateTaskPlan(userInput, workflow)

# Task execution
for task in taskPlan.tasks:
    result = await processor.executeTask(task, workflow, context)
    # Process results and handover

Task and Action Planning

The system operates on a two-level planning hierarchy:

  1. Task Planning: High-level objective breakdown

    • Each task represents a major objective
    • Tasks are planned upfront (Actionplan) or iteratively (React)
  2. Action Planning: Low-level operation sequencing

    • Actions are specific operations to accomplish a task
    • Actions are executed via method implementations

TaskContext Structure:

TaskContext(
    task_step: TaskStep,
    workflow: ChatWorkflow,
    workflow_id: str,
    previous_results: List[str],
    improvements: List[str],
    retry_count: int,
    # ... other context fields
)

Imports Constraint

Workflows layer imports ONLY from:

  • modules.datamodels.* - Data models
  • modules.services.* - Service layer
  • modules.workflows.* - Internal workflow components

NO direct imports from:

  • Interfaces
  • Connectors
  • AI core (use via services.ai)

Layer 2: Methods (@methods/)

Purpose

Methods implement executable actions. They define the interface between workflow planning and service execution, providing a standard way to perform operations with parameter handling.

Method Structure

All methods inherit from MethodBase:

class MethodBase:
    def __init__(self, services):
        self.services = services
        self.name = "method_name"
        self.description = "Method description"
    
    @property
    def actions(self) -> Dict[str, Dict]:
        # Auto-discovers @action decorated methods
        pass

Action Decorator

Methods expose executable actions using the @action decorator:

class MethodAi(MethodBase):
    @action
    async def process(self, parameters: Dict[str, Any]) -> ActionResult:
        """
        Process a user prompt with AI.
        
        Parameters:
        - aiPrompt (str, required): Instruction for the AI
        - documentList (list, optional): Document references
        - resultType (str, optional): Output format (txt, json, etc.)
        """
        aiPrompt = parameters.get("aiPrompt")
        documentList = parameters.get("documentList", [])
        resultType = parameters.get("resultType", "txt")
        
        # Call service layer
        result = await self.services.ai.callAiDocuments(...)
        
        # Return ActionResult
        return ActionResult.isSuccess(documents=[...])

Parameter Principles

  1. Type Safety: Parameters are typed and validated
  2. Required vs Optional: Documented in docstrings
  3. Default Values: Optional parameters have defaults
  4. Service Delegation: Methods delegate to services layer

What Belongs in Methods vs Services

Methods handle:

  • Parameter extraction and validation
  • Action-specific orchestration
  • Document packaging/unpackaging
  • Error handling for action execution

Services handle:

  • Core business logic
  • Standardized operations
  • Data transformation
  • Cross-cutting concerns (logging, stats)

Example Method Implementation

class MethodOutlook(MethodBase):
    def __init__(self, services):
        super().__init__(services)
        self.name = "outlook"
        self.description = "Microsoft Outlook operations"
    
    @action
    async def readMails(self, parameters: Dict[str, Any]) -> ActionResult:
        """
        Read emails from Outlook mailbox.
        
        Parameters:
        - mailboxName (str, required): Mailbox to read from
        - filterCriteria (dict, optional): Email filter criteria
        """
        # Extract parameters
        mailboxName = parameters.get("mailboxName")
        filterCriteria = parameters.get("filterCriteria", {})
        
        # Delegate to service layer
        emails = await self.services.outlook.readEmails(
            mailbox=mailboxName,
            filter=filterCriteria
        )
        
        # Package results as documents
        documents = [ActionDocument(
            documentName=f"email_{i}.json",
            documentData=email.to_dict(),
            mimeType="application/json"
        ) for i, email in enumerate(emails)]
        
        return ActionResult.isSuccess(documents=documents)

Available Methods

  1. methodAi: AI processing and generation

    • process: General AI processing with documents
    • webResearch: Web research and crawling
    • generateImage: AI image generation
  2. methodOutlook: Microsoft Outlook operations

    • readMails: Read emails
    • sendMail: Send emails
    • manageCalendar: Calendar operations
  3. methodSharepoint: SharePoint operations

    • readFiles: Read files
    • writeFiles: Write files
    • manageFolders: Folder management

Imports Constraint

Methods layer imports ONLY from:

  • modules.datamodels.* - Data models
  • modules.services.* - Service layer
  • modules.workflows.methods.* - Base classes

NO direct imports from:

  • Interfaces
  • Connectors
  • AI core
  • Workflow orchestration

Layer 3: Services (@services/)

Purpose

The services layer provides a comprehensive catalog of business features. All services follow a consistent structure and are accessible via self.services.xxx.

Services Architecture

Each service is structured as:

serviceX/
  - mainServiceX.py      # Public API
  - subCoreX.py          # Core business logic
  - subDocumentX.py     # Document-specific logic
  - subSharedX.py        # Shared utilities

Service Catalog

1. serviceAi - AI Operations

Purpose: AI calls, model selection, document processing

Key Functions:

# AI document processing
await services.ai.callAiDocuments(
    prompt="Generate report",
    documents=[...],
    options=AiCallOptions(...)
)

# AI planning calls
await services.ai.callAiPlanning(
    prompt="Plan tasks",
    placeholders=None
)

# Image operations
await services.ai.readImage(prompt, imageData, mimeType)
await services.ai.generateImage(prompt, size, quality, style)

Sub-modules:

  • subCoreAi: Core AI operations
  • subDocumentProcessing: Document chunking and processing
  • subDocumentGeneration: Single/multi-file generation
  • subSharedAiUtils: Shared utilities

2. serviceWorkflow - Workflow Management

Purpose: Workflow state, progress tracking, message handling

Key Functions:

# Progress tracking
services.workflow.progressLogStart(operationId, stage, step, details)
services.workflow.progressLogUpdate(operationId, progress, message)
services.workflow.progressLogFinish(operationId, success)

# Message handling
services.workflow.storeMessageWithDocuments(workflow, messageData, documents)
services.workflow.getChatDocumentsFromDocumentList(documentList)

# Workflow state
services.workflow.storeLog(workflow, logEntry)
services.workflow.storeWorkflowStat(workflow, aiResponse, process)

3. serviceExtraction - Document Extraction

Purpose: Extracting content from various document formats

Features:

  • Multiple extractors (PDF, DOCX, XLSX, images, etc.)
  • Chunking for large documents
  • Merging strategies for AI processing

Key Functions:

# Extract content
extracted = services.extraction.extractContent(
    documents=[...],
    options=ExtractionOptions(...)
)

# Merge AI results
merged = services.extraction.mergeAiResults(
    extractedContent=[...],
    aiResults=[...],
    strategy=MergeStrategy(...)
)

Plug & Play Extractors:

  • extractorPdf.py: PDF extraction
  • extractorDocx.py: DOCX extraction
  • extractorImage.py: Image OCR
  • extractorXlsx.py: Excel extraction
  • extractorCsv.py: CSV parsing
  • And more...

Chunkers:

  • chunkerText.py: Text chunking
  • chunkerTable.py: Table chunking
  • chunkerImage.py: Image chunking
  • chunkerStructure.py: Structural chunking

Mergers:

  • mergerText.py: Text merging
  • mergerTable.py: Table merging
  • mergerDefault.py: Default merging

4. serviceGeneration - Document Generation

Purpose: Generating documents in various formats

Features:

  • Multiple renderers (JSON, HTML, PDF, DOCX, etc.)
  • Template support
  • Schema validation

Key Functions:

# Process action results into documents
documents = services.generation.processActionResultDocuments(
    action_result,
    action,
    workflow
)

# Create documents from action results
created = services.generation.createDocumentsFromActionResult(
    action_result,
    action,
    workflow,
    message_id
)

Plug & Play Renderers:

  • rendererJson.py: JSON rendering
  • rendererHtml.py: HTML rendering
  • rendererPdf.py: PDF generation
  • rendererDocx.py: DOCX generation
  • rendererText.py: Plain text
  • And more...

5. serviceWeb - Web Operations

Purpose: Web research, crawling, content extraction

Key Functions:

# Perform web research
research = await services.web.performWebResearch(
    prompt="Find information",
    urls=[...],
    country="us",
    language="en",
    researchDepth="general"
)

6. serviceTicket - Ticket System Integration

Purpose: Integration with ticket systems (Jira, ClickUp, etc.)

7. serviceSharepoint - SharePoint Operations

Purpose: SharePoint file and folder management

8. serviceNeutralization - Data Normalization

Purpose: Normalizing data from different sources

9. serviceNormalization - Additional Normalization

Purpose: Extended normalization features

10. serviceUtils - Utility Functions

Purpose: Timestamps, formatting, validation

Key Functions:

# Get UTC timestamp
timestamp = services.utils.timestampGetUtc()

# Other utility functions
...

Services Access Pattern

All services are accessed via self.services.xxx:

# In a method
class MyMethod(MethodBase):
    @action
    async def myAction(self, parameters):
        # Access any service
        result = await self.services.ai.callAiDocuments(...)
        data = self.services.workflow.getFileInfo(fileId)
        timestamp = self.services.utils.timestampGetUtc()
        
        return ActionResult.isSuccess(...)

Plug & Play Document Processing

The system supports plug & play extractors and renderers for document transformation:

Extraction Flow:

Source Document (PDF, DOCX, etc.)
  → Extractor (registered by MIME type)
  → Chunker (if document too large)
  → ContentExtracted (standard structure)

Generation Flow:

ContentExtracted (standard structure)
  → AI Processing
  → Merger (combine AI results)
  → Renderer (convert to target format)
  → Destination Document (JSON, HTML, PDF, etc.)

Internal Data Structure:

ContentExtracted:
  - id: str
  - parts: List[ContentPart]
    - label: str
    - data: str
    - metadata: ContentMetadata
      - size, pages, mimeType, etc.

# Standard JSON structure for transformation
{
  "documentId": "...",
  "parts": [
    {
      "label": "text_content",
      "data": "...",
      "metadata": {...}
    }
  ]
}

Transformation Matrix:

Any Source Format (PDF, DOCX, XLSX, images, etc.)
  ↓
Standard JSON (ContentExtracted)
  ↓
Any Destination Format (JSON, HTML, PDF, DOCX, etc.)

This enables transforming ANY document format to ANY other format through the standardized JSON intermediate structure.

Imports Constraint

Services layer imports ONLY from:

  • modules.datamodels.* - Data models
  • modules.interfaces.* - Interface layer
  • modules.aicore.* - AI core (for AI services)
  • modules.services.* - Other services

NO direct imports from:

  • Connectors (use via interfaces)
  • Workflow orchestration
  • Methods

Layer 4: Interfaces (@interfaces/)

Purpose

Interfaces provide standardized APIs for accessing different data sources and components. They abstract away implementation details and enable pluggable connector systems.

Interface Structure

Each interface defines:

  1. Standardized Methods: Common operations across all implementations
  2. Data Models: Interface-specific data models
  3. Access Control: User permission management

Available Interfaces

1. interfaceDbChatAccess & interfaceDbChatObjects

Purpose: Chat system database operations

Key Classes:

  • ChatWorkflow: Workflow instances
  • ChatMessage: Messages in a workflow
  • ChatDocument: Documents attached to messages
  • ChatLog: Workflow logs
  • ChatStat: Workflow statistics

Key Functions:

# Workflow operations
workflow = interface.createWorkflow(workflowData)
workflow = interface.getWorkflow(workflowId)
interface.updateWorkflow(workflowId, updateData)

# Message operations
message = interface.createMessage(messageData)
messages = interface.getMessages(workflowId)

# Document operations
document = interface.createDocument(documentData)
documents = interface.getDocuments(messageId)

2. interfaceDbAppAccess & interfaceDbAppObjects

Purpose: Application database operations

Key Classes:

  • UserInDB: User accounts
  • VoiceSettings: Voice configuration
  • Connection: External connections

3. interfaceDbComponentAccess & interfaceDbComponentObjects

Purpose: Component database operations (files, data storage)

Key Classes:

  • FileItem: File metadata
  • FileData: File content

Key Functions:

# File operations
fileItem = interface.createFile(name, mimeType, content)
fileData = interface.getFileData(fileId)
interface.updateFileData(fileId, content)

4. interfaceAiObjects

Purpose: AI model management and calls

Key Functions:

# AI calls
response = await interface.call(request)

# Model selection
model = interface.getModelsByOperationType(operationType)

5. interfaceTicketObjects

Purpose: Ticket system interface

Key Classes:

  • TicketBase: Base ticket interface
  • TicketFieldAttribute: Ticket field definitions

Interface Pattern

Interfaces use a factory pattern:

def getInterface(user: User, workflow: ChatWorkflow = None) -> InterfaceType:
    """Factory function that returns appropriate implementation."""
    return InterfaceImplementation(user, workflow)

Connector Integration

Interfaces can have multiple connector implementations:

interfaceDbChat
  ├─ connectorDbPostgre.py
  ├─ connectorDbJson.py
  └─ (future connectors)

Each connector:

  • Implements interface requirements
  • Handles data source specifics
  • Returns standardized data models
  • Ensures interface compatibility

Imports Constraint

Interfaces layer imports:

  • modules.datamodels.* - Data models
  • Database/connector specific libraries

NO direct imports from:

  • Services
  • Workflows
  • Methods

Layer 5: Connectors (@connectors/)

Purpose

Connectors integrate external systems and services. They implement interface contracts and provide standardized access to diverse data sources.

Naming Convention

Connector naming follows the pattern:

connector{InterfaceType}{Provider}.py

Examples:

  • connectorDbPostgre.py - PostgreSQL database connector
  • connectorDbJson.py - JSON file connector
  • connectorTicketsJira.py - Jira ticket connector
  • connectorTicketsClickup.py - ClickUp connector
  • connectorVoiceGoogle.py - Google Voice connector

Connector Structure

Every connector must:

  1. Implement Interface Contract:
class ConnectorTicketsJira(TicketBase):
    def __init__(self, *, apiUsername, apiToken, apiUrl, ...):
        # Initialize connector
        pass
    
    async def read_tasks(self, *, limit: int = 0) -> list[dict]:
        # Implement interface method
        pass
    
    async def create_task(self, taskData: dict) -> dict:
        # Implement interface method
        pass
  1. Ensure Standardization:
  • Return standardized data models
  • Handle authentication consistently
  • Provide error handling
  • Log operations
  1. Respect Interface Requirements:
  • Must implement all required methods
  • Must return data in expected format
  • Must handle errors gracefully

Connector Examples

Database Connectors

# PostgreSQL Connector
class ConnectorDbPostgre:
    async def executeQuery(self, query: str, params: dict):
        # Execute against PostgreSQL
        pass
# JSON File Connector
class ConnectorDbJson:
    async def executeQuery(self, query: str, params: dict):
        # Execute against JSON files
        pass

Ticket System Connectors

# Jira Connector
class ConnectorTicketsJira(TicketBase):
    async def read_attributes(self) -> List[TicketFieldAttribute]:
        # Read Jira field attributes
        pass
    
    async def read_tasks(self, *, limit: int = 0) -> List[dict]:
        # Read Jira tasks
        pass
# ClickUp Connector
class ConnectorTicketsClickup(TicketBase):
    async def read_attributes(self) -> List[TicketFieldAttribute]:
        # Read ClickUp field attributes
        pass
    
    async def read_tasks(self, *, limit: int = 0) -> List[dict]:
        # Read ClickUp tasks
        pass

Connector Registration

Connectors are registered dynamically:

# In interface
connectors = [
    ConnectorTicketsJira(credentials),
    ConnectorTicketsClickup(credentials)
]

Imports Constraint

Connectors layer imports:

  • modules.datamodels.* - Data models
  • modules.interfaces.* - Interface contracts
  • External libraries (aiohttp, etc.)

NO direct imports from:

  • Services
  • Workflows
  • Methods

Layer 6: Data Models (@datamodels/)

Purpose

Data models define the core data structures used throughout the system. They use Pydantic for validation and type safety.

Key Data Models

Chat Models (datamodelChat.py)

ChatWorkflow:
  - id: str
  - name: str
  - status: str
  - workflowMode: str  # "React" or "Actionplan"
  - currentRound: int
  - currentTask: int
  - currentAction: int
  - totalTasks: int
  - totalActions: int
  - messages: List[ChatMessage]
  - logs: List[ChatLog]
  - stats: List[ChatStat]

ChatMessage:
  - id: str
  - workflowId: str
  - role: str  # "user" or "assistant"
  - message: str
  - documents: List[ChatDocument]
  - documentsLabel: str
  - roundNumber: int
  - taskNumber: int
  - actionNumber: int
  - taskProgress: str
  - actionProgress: str

TaskContext:
  - task_step: TaskStep
  - workflow: ChatWorkflow
  - workflow_id: str
  - previous_results: List[str]
  - available_documents: List[ChatDocument]
  - improvements: List[str]
  - retry_count: int

TaskStep:
  - objective: str
  - description: str
  - criteria: List[str]

TaskPlan:
  - tasks: List[TaskStep]
  - summary: str

ActionResult:
  - success: bool
  - documents: List[ActionDocument]
  - error: str (optional)

AI Models (datamodelAi.py)

AiModel:
  - name: str
  - displayName: str
  - connectorType: str
  - contextLength: int
  - qualityRating: int  # 1-10
  - speedRating: int   # 1-10
  - costPer1kTokensInput: float
  - costPer1kTokensOutput: float
  - operationTypes: List[OperationTypeRating]

AiCallOptions:
  - operationType: OperationTypeEnum
  - resultFormat: str
  - processingMode: ProcessingModeEnum
  - priority: PriorityEnum
  - modelName: str (optional)

AiCallRequest:
  - prompt: str
  - context: str (optional)
  - options: AiCallOptions
  - placeholders: List[PromptPlaceholder] (optional)

AiCallResponse:
  - content: str
  - modelName: str
  - priceUsd: float
  - processingTime: float
  - bytesSent: int
  - bytesReceived: int
  - errorCount: int

Extraction Models (datamodelExtraction.py)

ContentPart:
  - label: str
  - data: str
  - metadata: ContentMetadata
  - typeGroup: str

ContentExtracted:
  - id: str
  - parts: List[ContentPart]

ContentMetadata:
  - size: int
  - mimeType: str
  - pages: int (optional)
  - error: str (optional)

MergeStrategy:
  - mergeType: str
  - config: dict

Data Model Usage

Data models are imported across all layers:

# In any layer
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage
from modules.datamodels.datamodelAi import AiCallOptions

AI Core System (@aicore/)

Purpose

The AI core provides dynamic model selection based on KPIs, business requirements, and operation types. It harmonizes model parameters for optimal performance.

Key Components

aicoreModelRegistry.py

Purpose: Centralized model registry for all AI connectors

Key Functions:

# Discover and register connectors
connectors = modelRegistry.discoverConnectors()
modelRegistry.registerConnector(connector)

# Get available models
availableModels = modelRegistry.getAvailableModels()

# Get models by operation type
models = modelRegistry.getModelsByOperationType(operationType)

# Get specific model
model = modelRegistry.getModel(modelName)

aicoreModelSelector.py

Purpose: Intelligent model selection based on requirements

Selection Criteria:

  1. Operation Type: Primary filter (must match)
  2. Context Length: Must fit within model limits
  3. Prompt Size: Checks if prompt fits
  4. Processing Mode: Basic/Advanced/Detailed
  5. Priority: Speed/Quality/Cost/Balanced

Selection Algorithm:

def selectModel(prompt, context, options, availableModels):
    # Step 1: Filter by operation type
    operationFiltered = filterByOperationType(availableModels, options)
    
    # Step 2: Filter by prompt size
    promptFiltered = filterByPromptSize(operationFiltered, prompt)
    
    # Step 3: Calculate scores
    scoredModels = calculateScores(promptFiltered, prompt, context, options)
    
    # Step 4: Sort by score
    sortedModels = sortByScore(scoredModels)
    
    return sortedModels[0]  # Best model

Scoring Formula:

score = (
    operationTypeRating * 1000.0 +        # Primary
    sizeRating +                          # Context fit
    processingModeRating +                # Mode compatibility
    priorityRating                         # Speed/Quality/Cost
)

aicorePlugin*.py

AI connectors for different providers:

  • aicorePluginOpenai: OpenAI models (GPT-4, etc.)
  • aicorePluginAnthropic: Claude models
  • aicorePluginPerplexity: Perplexity AI
  • aicorePluginTavily: Tavily search
  • aicorePluginInternal: Internal models

Connector Interface:

class BaseConnectorAi(ABC):
    def getModels(self) -> List[AiModel]:
        """Return all available models from this connector."""
        pass
    
    def getConnectorType(self) -> str:
        """Return connector type identifier."""
        pass

Dynamic Model Selection

Selection Process:

1. Analyze request (prompt, context, operation type)
2. Get all available models from registry
3. Filter by operation type (MUST match)
4. Filter by size constraints
5. Score each model
6. Select best model
7. Execute with failover to backup models

Failover Mechanism:

# Try models in priority order
for model in failoverModels:
    try:
        result = await callModel(model, prompt, context)
        return result
    except Exception:
        # Try next model
        continue

# If all models fail
raise Exception("All models failed")

Operation Types

Models are rated for different operation types:

  • GENERATE: Text generation
  • ANALYZE: Text analysis
  • EXTRACT: Information extraction
  • PLAN: Task planning
  • IMAGE_GENERATE: Image generation
  • IMAGE_ANALYZE: Image analysis

Rating System:

  • Each model has ratings per operation type (1-10)
  • Higher rating = better for that operation
  • Rating * 1000 is primary sorting criteria

Processing Modes

  • BASIC: Simple operations, fast processing
  • ADVANCED: Moderate complexity
  • DETAILED: High complexity, thorough analysis

Compatibility:

  • Models can handle their mode or downgrade
  • Mode compatibility affects scoring

Priority Handling

  • BALANCED: Default, considers all factors
  • SPEED: Optimize for speed
  • QUALITY: Optimize for quality
  • COST: Optimize for cost

Priority Scoring:

if priority == "SPEED":
    score += model.speedRating / 10.0
elif priority == "QUALITY":
    score += model.qualityRating / 10.0
elif priority == "COST":
    score += costOptimization(model)

Harmonized Parameters

The system harmonizes parameters across different model APIs:

  • Temperature/sampling parameters
  • Max tokens settings
  • Context length handling
  • Response format handling

Usage Example

# In service layer
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector

# Get available models
availableModels = modelRegistry.getAvailableModels()

# Create options
options = AiCallOptions(
    operationType=OperationTypeEnum.GENERATE,
    resultFormat="json",
    processingMode=ProcessingModeEnum.DETAILED,
    priority=PriorityEnum.QUALITY
)

# Select best model
selectedModel = modelSelector.selectModel(
    prompt=prompt,
    context=context,
    options=options,
    availableModels=availableModels
)

# Execute with failover
response = await executeWithFailover(
    models=modelSelector.getFailoverModelList(...),
    prompt=prompt,
    context=context,
    options=options
)

Data Flow

Complete Request Flow

User Input
  ↓
WorkflowManager.workflowStart()
  ↓
WorkflowProcessor.generateTaskPlan()
  ↓
AI Planning Call (services.ai.callAiPlanning)
  ↓
Task Plan Created
  ↓
For each Task:
  ActionPlan Generated
  ↓
  For each Action:
    Method Action Executed
      ↓
    Service Layer Called
      ↓
    Interface Layer Called
      ↓
    Connector Executes
      ↓
    Results Returned
  ↓
Results Aggregated
  ↓
Final Response Generated

Document Processing Flow

Source Document (PDF, DOCX, etc.)
  ↓
services.extraction.extractContent()
  ↓
Extractor (extractorPdf, extractorDocx, etc.)
  ↓
Chunker (if document too large)
  ↓
ContentExtracted (standard JSON structure)
  ↓
AI Processing (services.ai.callAiDocuments)
  ↓
Merger (merge AI results)
  ↓
Renderer (rendererHtml, rendererPdf, etc.)
  ↓
Destination Document (HTML, PDF, etc.)

Design Principles

1. Layered Architecture

Each layer has specific responsibilities and imports only from layers below it.

2. Standardization

Standardized data models, interfaces, and method signatures enable plug & play components.

3. Dynamic Selection

AI models selected dynamically based on requirements, not hardcoded.

4. Failover Mechanism

Automatic failover to backup models ensures reliability.

5. Progress Tracking

Complete progress tracking throughout workflow execution.

6. Document Transformation

Standardized JSON intermediate format enables any-to-any document transformation.

7. Modularity

Clear separation between workflows, methods, services, interfaces, and connectors.

8. Type Safety

Pydantic models ensure type safety and validation across the system.


Development Guidelines

Adding a New Method

  1. Create methodX.py in modules/workflows/methods/
  2. Inherit from MethodBase
  3. Implement @action decorated methods
  4. Follow parameter principles
  5. Delegate to services layer
class MethodX(MethodBase):
    def __init__(self, services):
        super().__init__(services)
        self.name = "x"
        self.description = "X operations"
    
    @action
    async def operation(self, parameters):
        # Extract parameters
        # Call services layer
        # Return ActionResult
        pass

Adding a New Service

  1. Create serviceX/ directory
  2. Implement mainServiceX.py with public API
  3. Add sub-modules as needed
  4. Register in services/__init__.py
# In services/__init__.py
from .serviceX.mainServiceX import ServiceX
self.x = PublicService(ServiceX(self))

Adding a New Connector

  1. Create connectorInterfaceXProvider.py in connectors/
  2. Implement interface contract
  3. Ensure standardization
  4. Handle errors gracefully
  5. Register in interface

Adding a New Extractor/Renderer

  1. Create extractorX.py or rendererX.py
  2. Register in ExtractorRegistry or RendererRegistry
  3. Implement interface:
    • extract(documentBytes, fileName, mimeType) -> ContentExtracted
    • or render(contentData, options) -> bytes

Error Handling

Workflow Errors

  • Caught at workflow processor level
  • Workflow status set to "failed"
  • Error logged and returned to user

Action Errors

  • Caught at method level
  • ActionResult returned with success=False
  • Error message in error field

Service Errors

  • Caught at service level
  • Logged with context
  • Re-raised or handled gracefully

Connector Errors

  • Caught at connector level
  • Retry with next connector (if applicable)
  • Error logged and escalated

Logging

Log Levels

  • INFO: Normal operation flow
  • DEBUG: Detailed debugging information
  • WARNING: Non-critical issues
  • ERROR: Critical errors

Log Structure

timestamp | level | module | message | context

Workflow Logs

self.services.workflow.storeLog(workflow, {
    "message": "Task completed",
    "type": "info",
    "status": "success",
    "progress": 100
})

Testing

Unit Tests

Test individual components in isolation.

Integration Tests

Test interaction between components.

Workflow Tests

Test complete workflow execution.

Mock Services

Use mock services for testing without external dependencies.


Performance Considerations

Async/Await

All I/O operations are async for optimal performance.

Lazy Initialization

Services initialized lazily to reduce startup time.

Caching

Model registry caches connector instances.

Chunking

Large documents chunked for efficient processing.

Failover

Automatic failover ensures reliability without sacrificing performance.


Security

Access Control

User permission checks at interface level.

Data Validation

Pydantic models validate all inputs.

Sanitization

Prompt content sanitized to prevent injection.

Encryption

Sensitive data encrypted at rest and in transit.


Conclusion

The PowerOn workflow system is a sophisticated, multi-layered architecture that provides:

  • Intelligent Task Automation: AI-powered task and action planning
  • Document Processing: Plug & play extractors and renderers for any format
  • Service Catalog: Comprehensive business feature catalog
  • Dynamic Model Selection: Optimal AI model selection based on requirements
  • Standardized Interfaces: Consistent APIs across all components
  • Extensibility: Easy to add new methods, services, connectors

The system emphasizes:

  • Separation of concerns
  • Standardization
  • Modularity
  • Type safety
  • Performance
  • Reliability