wiki/z-archive/appdoc/doc_dev_workflow.md

1412 lines
36 KiB
Markdown

# Workflow System - Developer Documentation
## Overview
The PowerOn workflow system is a multi-layered architecture designed for intelligent task automation and document processing. The system processes user requests through a structured pipeline involving workflow orchestration, AI-powered task planning, action execution via specialized methods, and comprehensive service integration.
## System Architecture
The workflow system is built on six distinct layers, each serving a specific purpose:
```
┌─────────────────────────────────────────────────────────────┐
│ Workflow Layer │
│ (workflows/) - Orchestration & Execution Management │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Method Layer │
│ (methods/) - Action Implementation & Parameters │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Service Layer │
│ (services/) - Business Logic & Feature Catalog │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Interface Layer │
│ (interfaces/) - Standardized Component APIs │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Connector Layer │
│ (connectors/) - External System Integration │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Datamodels & AI Core │
│ (datamodels/, aicore/) - Data Models & AI Orchestration │
└─────────────────────────────────────────────────────────────┘
```
---
## Layer 1: Workflow System (@workflows/)
### Purpose
The workflow layer is the top-level orchestration system that manages task planning, execution, and state management. It coordinates between different workflow modes (React, Actionplan, etc.) and handles the complete workflow lifecycle.
### Key Components
#### workflowManager.py
Entry point for workflow operations.
**Key Functions:**
- `workflowStart()`: Initialize and start a workflow instance
- `workflowStop()`: Stop an active workflow
- `_workflowProcess()`: Main workflow execution loop
- `_planTasks()`: Generate task plans using AI
- `_executeTasks()`: Execute tasks sequentially
**Workflow States:**
- `running`: Workflow is actively processing
- `stopped`: User manually stopped the workflow
- `completed`: Workflow finished successfully
- `failed`: Workflow encountered an error
**Example:**
```python
# Start a new workflow
workflow = await workflowManager.workflowStart(
userInput=UserInputRequest(prompt="Analyze sales data"),
workflowMode="React" # or "Actionplan"
)
```
#### Workflow Types
1. **React Mode**: Iterative task execution with up to 5 refinement steps
- Best for exploratory tasks that may require multiple passes
- Uses `maxSteps: 5` configuration
2. **Actionplan Mode**: Single-pass task execution with full action planning upfront
- Best for well-defined tasks with clear objectives
- Single execution path
**Workflow Lifecycle:**
```
start → analyzeUserInput → planTasks → executeTasks → generateResults → complete
```
#### Processing Layer (processing/)
The processing layer handles mode-specific execution logic:
- `workflowProcessor.py`: Main processor delegating to mode implementations
- `modes/modeBase.py`: Base class for all workflow modes
- `modes/modeReact.py`: React mode implementation
- `modes/modeActionplan.py`: Actionplan mode implementation
**Task Execution Flow:**
```python
# Task planning
taskPlan = await processor.generateTaskPlan(userInput, workflow)
# Task execution
for task in taskPlan.tasks:
result = await processor.executeTask(task, workflow, context)
# Process results and handover
```
### Task and Action Planning
The system operates on a two-level planning hierarchy:
1. **Task Planning**: High-level objective breakdown
- Each task represents a major objective
- Tasks are planned upfront (Actionplan) or iteratively (React)
2. **Action Planning**: Low-level operation sequencing
- Actions are specific operations to accomplish a task
- Actions are executed via method implementations
**TaskContext Structure:**
```python
TaskContext(
task_step: TaskStep,
workflow: ChatWorkflow,
workflow_id: str,
previous_results: List[str],
improvements: List[str],
retry_count: int,
# ... other context fields
)
```
### Imports Constraint
Workflows layer imports ONLY from:
- `modules.datamodels.*` - Data models
- `modules.services.*` - Service layer
- `modules.workflows.*` - Internal workflow components
**NO direct imports from:**
- Interfaces
- Connectors
- AI core (use via services.ai)
---
## Layer 2: Methods (@methods/)
### Purpose
Methods implement executable actions. They define the interface between workflow planning and service execution, providing a standard way to perform operations with parameter handling.
### Method Structure
All methods inherit from `MethodBase`:
```python
class MethodBase:
def __init__(self, services):
self.services = services
self.name = "method_name"
self.description = "Method description"
@property
def actions(self) -> Dict[str, Dict]:
# Auto-discovers @action decorated methods
pass
```
### Action Decorator
Methods expose executable actions using the `@action` decorator:
```python
class MethodAi(MethodBase):
@action
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Process a user prompt with AI.
Parameters:
- aiPrompt (str, required): Instruction for the AI
- documentList (list, optional): Document references
- resultType (str, optional): Output format (txt, json, etc.)
"""
aiPrompt = parameters.get("aiPrompt")
documentList = parameters.get("documentList", [])
resultType = parameters.get("resultType", "txt")
# Call service layer
result = await self.services.ai.callAiDocuments(...)
# Return ActionResult
return ActionResult.isSuccess(documents=[...])
```
### Parameter Principles
1. **Type Safety**: Parameters are typed and validated
2. **Required vs Optional**: Documented in docstrings
3. **Default Values**: Optional parameters have defaults
4. **Service Delegation**: Methods delegate to services layer
### What Belongs in Methods vs Services
**Methods handle:**
- Parameter extraction and validation
- Action-specific orchestration
- Document packaging/unpackaging
- Error handling for action execution
**Services handle:**
- Core business logic
- Standardized operations
- Data transformation
- Cross-cutting concerns (logging, stats)
### Example Method Implementation
```python
class MethodOutlook(MethodBase):
def __init__(self, services):
super().__init__(services)
self.name = "outlook"
self.description = "Microsoft Outlook operations"
@action
async def readMails(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Read emails from Outlook mailbox.
Parameters:
- mailboxName (str, required): Mailbox to read from
- filterCriteria (dict, optional): Email filter criteria
"""
# Extract parameters
mailboxName = parameters.get("mailboxName")
filterCriteria = parameters.get("filterCriteria", {})
# Delegate to service layer
emails = await self.services.outlook.readEmails(
mailbox=mailboxName,
filter=filterCriteria
)
# Package results as documents
documents = [ActionDocument(
documentName=f"email_{i}.json",
documentData=email.to_dict(),
mimeType="application/json"
) for i, email in enumerate(emails)]
return ActionResult.isSuccess(documents=documents)
```
### Available Methods
1. **methodAi**: AI processing and generation
- `process`: General AI processing with documents
- `webResearch`: Web research and crawling
- `generateImage`: AI image generation
2. **methodOutlook**: Microsoft Outlook operations
- `readMails`: Read emails
- `sendMail`: Send emails
- `manageCalendar`: Calendar operations
3. **methodSharepoint**: SharePoint operations
- `readFiles`: Read files
- `writeFiles`: Write files
- `manageFolders`: Folder management
### Imports Constraint
Methods layer imports ONLY from:
- `modules.datamodels.*` - Data models
- `modules.services.*` - Service layer
- `modules.workflows.methods.*` - Base classes
**NO direct imports from:**
- Interfaces
- Connectors
- AI core
- Workflow orchestration
---
## Layer 3: Services (@services/)
### Purpose
The services layer provides a comprehensive catalog of business features. All services follow a consistent structure and are accessible via `self.services.xxx`.
### Services Architecture
Each service is structured as:
```
serviceX/
- mainServiceX.py # Public API
- subCoreX.py # Core business logic
- subDocumentX.py # Document-specific logic
- subSharedX.py # Shared utilities
```
### Service Catalog
#### 1. serviceAi - AI Operations
**Purpose**: AI calls, model selection, document processing
**Key Functions:**
```python
# AI document processing
await services.ai.callAiDocuments(
prompt="Generate report",
documents=[...],
options=AiCallOptions(...)
)
# AI planning calls
await services.ai.callAiPlanning(
prompt="Plan tasks",
placeholders=None
)
# Image operations
await services.ai.readImage(prompt, imageData, mimeType)
await services.ai.generateImage(prompt, size, quality, style)
```
**Sub-modules:**
- `subCoreAi`: Core AI operations
- `subDocumentProcessing`: Document chunking and processing
- `subDocumentGeneration`: Single/multi-file generation
- `subSharedAiUtils`: Shared utilities
#### 2. serviceWorkflow - Workflow Management
**Purpose**: Workflow state, progress tracking, message handling
**Key Functions:**
```python
# Progress tracking
services.workflow.progressLogStart(operationId, stage, step, details)
services.workflow.progressLogUpdate(operationId, progress, message)
services.workflow.progressLogFinish(operationId, success)
# Message handling
services.workflow.storeMessageWithDocuments(workflow, messageData, documents)
services.workflow.getChatDocumentsFromDocumentList(documentList)
# Workflow state
services.workflow.storeLog(workflow, logEntry)
services.workflow.storeWorkflowStat(workflow, aiResponse, process)
```
#### 3. serviceExtraction - Document Extraction
**Purpose**: Extracting content from various document formats
**Features:**
- Multiple extractors (PDF, DOCX, XLSX, images, etc.)
- Chunking for large documents
- Merging strategies for AI processing
**Key Functions:**
```python
# Extract content
extracted = services.extraction.extractContent(
documents=[...],
options=ExtractionOptions(...)
)
# Merge AI results
merged = services.extraction.mergeAiResults(
extractedContent=[...],
aiResults=[...],
strategy=MergeStrategy(...)
)
```
**Plug & Play Extractors:**
- `extractorPdf.py`: PDF extraction
- `extractorDocx.py`: DOCX extraction
- `extractorImage.py`: Image OCR
- `extractorXlsx.py`: Excel extraction
- `extractorCsv.py`: CSV parsing
- And more...
**Chunkers:**
- `chunkerText.py`: Text chunking
- `chunkerTable.py`: Table chunking
- `chunkerImage.py`: Image chunking
- `chunkerStructure.py`: Structural chunking
**Mergers:**
- `mergerText.py`: Text merging
- `mergerTable.py`: Table merging
- `mergerDefault.py`: Default merging
#### 4. serviceGeneration - Document Generation
**Purpose**: Generating documents in various formats
**Features:**
- Multiple renderers (JSON, HTML, PDF, DOCX, etc.)
- Template support
- Schema validation
**Key Functions:**
```python
# Process action results into documents
documents = services.generation.processActionResultDocuments(
action_result,
action,
workflow
)
# Create documents from action results
created = services.generation.createDocumentsFromActionResult(
action_result,
action,
workflow,
message_id
)
```
**Plug & Play Renderers:**
- `rendererJson.py`: JSON rendering
- `rendererHtml.py`: HTML rendering
- `rendererPdf.py`: PDF generation
- `rendererDocx.py`: DOCX generation
- `rendererText.py`: Plain text
- And more...
#### 5. serviceWeb - Web Operations
**Purpose**: Web research, crawling, content extraction
**Key Functions:**
```python
# Perform web research
research = await services.web.performWebResearch(
prompt="Find information",
urls=[...],
country="us",
language="en",
researchDepth="general"
)
```
#### 6. serviceTicket - Ticket System Integration
**Purpose**: Integration with ticket systems (Jira, ClickUp, etc.)
#### 7. serviceSharepoint - SharePoint Operations
**Purpose**: SharePoint file and folder management
#### 8. serviceNeutralization - Data Normalization
**Purpose**: Normalizing data from different sources
#### 9. serviceNormalization - Additional Normalization
**Purpose**: Extended normalization features
#### 10. serviceUtils - Utility Functions
**Purpose**: Timestamps, formatting, validation
**Key Functions:**
```python
# Get UTC timestamp
timestamp = services.utils.timestampGetUtc()
# Other utility functions
...
```
### Services Access Pattern
All services are accessed via `self.services.xxx`:
```python
# In a method
class MyMethod(MethodBase):
@action
async def myAction(self, parameters):
# Access any service
result = await self.services.ai.callAiDocuments(...)
data = self.services.workflow.getFileInfo(fileId)
timestamp = self.services.utils.timestampGetUtc()
return ActionResult.isSuccess(...)
```
### Plug & Play Document Processing
The system supports plug & play extractors and renderers for document transformation:
**Extraction Flow:**
```
Source Document (PDF, DOCX, etc.)
→ Extractor (registered by MIME type)
→ Chunker (if document too large)
→ ContentExtracted (standard structure)
```
**Generation Flow:**
```
ContentExtracted (standard structure)
→ AI Processing
→ Merger (combine AI results)
→ Renderer (convert to target format)
→ Destination Document (JSON, HTML, PDF, etc.)
```
**Internal Data Structure:**
```python
ContentExtracted:
- id: str
- parts: List[ContentPart]
- label: str
- data: str
- metadata: ContentMetadata
- size, pages, mimeType, etc.
# Standard JSON structure for transformation
{
"documentId": "...",
"parts": [
{
"label": "text_content",
"data": "...",
"metadata": {...}
}
]
}
```
**Transformation Matrix:**
```
Any Source Format (PDF, DOCX, XLSX, images, etc.)
Standard JSON (ContentExtracted)
Any Destination Format (JSON, HTML, PDF, DOCX, etc.)
```
This enables transforming ANY document format to ANY other format through the standardized JSON intermediate structure.
### Imports Constraint
Services layer imports ONLY from:
- `modules.datamodels.*` - Data models
- `modules.interfaces.*` - Interface layer
- `modules.aicore.*` - AI core (for AI services)
- `modules.services.*` - Other services
**NO direct imports from:**
- Connectors (use via interfaces)
- Workflow orchestration
- Methods
---
## Layer 4: Interfaces (@interfaces/)
### Purpose
Interfaces provide standardized APIs for accessing different data sources and components. They abstract away implementation details and enable pluggable connector systems.
### Interface Structure
Each interface defines:
1. **Standardized Methods**: Common operations across all implementations
2. **Data Models**: Interface-specific data models
3. **Access Control**: User permission management
### Available Interfaces
#### 1. interfaceDbChatAccess & interfaceDbChatObjects
**Purpose**: Chat system database operations
**Key Classes:**
- `ChatWorkflow`: Workflow instances
- `ChatMessage`: Messages in a workflow
- `ChatDocument`: Documents attached to messages
- `ChatLog`: Workflow logs
- `ChatStat`: Workflow statistics
**Key Functions:**
```python
# Workflow operations
workflow = interface.createWorkflow(workflowData)
workflow = interface.getWorkflow(workflowId)
interface.updateWorkflow(workflowId, updateData)
# Message operations
message = interface.createMessage(messageData)
messages = interface.getMessages(workflowId)
# Document operations
document = interface.createDocument(documentData)
documents = interface.getDocuments(messageId)
```
#### 2. interfaceDbAppAccess & interfaceDbAppObjects
**Purpose**: Application database operations
**Key Classes:**
- `UserInDB`: User accounts
- `VoiceSettings`: Voice configuration
- `Connection`: External connections
#### 3. interfaceDbComponentAccess & interfaceDbComponentObjects
**Purpose**: Component database operations (files, data storage)
**Key Classes:**
- `FileItem`: File metadata
- `FileData`: File content
**Key Functions:**
```python
# File operations
fileItem = interface.createFile(name, mimeType, content)
fileData = interface.getFileData(fileId)
interface.updateFileData(fileId, content)
```
#### 4. interfaceAiObjects
**Purpose**: AI model management and calls
**Key Functions:**
```python
# AI calls
response = await interface.call(request)
# Model selection
model = interface.getModelsByOperationType(operationType)
```
#### 5. interfaceTicketObjects
**Purpose**: Ticket system interface
**Key Classes:**
- `TicketBase`: Base ticket interface
- `TicketFieldAttribute`: Ticket field definitions
### Interface Pattern
Interfaces use a factory pattern:
```python
def getInterface(user: User, workflow: ChatWorkflow = None) -> InterfaceType:
"""Factory function that returns appropriate implementation."""
return InterfaceImplementation(user, workflow)
```
### Connector Integration
Interfaces can have multiple connector implementations:
```
interfaceDbChat
├─ connectorDbPostgre.py
├─ connectorDbJson.py
└─ (future connectors)
```
Each connector:
- Implements interface requirements
- Handles data source specifics
- Returns standardized data models
- Ensures interface compatibility
### Imports Constraint
Interfaces layer imports:
- `modules.datamodels.*` - Data models
- Database/connector specific libraries
**NO direct imports from:**
- Services
- Workflows
- Methods
---
## Layer 5: Connectors (@connectors/)
### Purpose
Connectors integrate external systems and services. They implement interface contracts and provide standardized access to diverse data sources.
### Naming Convention
Connector naming follows the pattern:
```
connector{InterfaceType}{Provider}.py
```
Examples:
- `connectorDbPostgre.py` - PostgreSQL database connector
- `connectorDbJson.py` - JSON file connector
- `connectorTicketsJira.py` - Jira ticket connector
- `connectorTicketsClickup.py` - ClickUp connector
- `connectorVoiceGoogle.py` - Google Voice connector
### Connector Structure
Every connector must:
1. **Implement Interface Contract**:
```python
class ConnectorTicketsJira(TicketBase):
def __init__(self, *, apiUsername, apiToken, apiUrl, ...):
# Initialize connector
pass
async def read_tasks(self, *, limit: int = 0) -> list[dict]:
# Implement interface method
pass
async def create_task(self, taskData: dict) -> dict:
# Implement interface method
pass
```
2. **Ensure Standardization**:
- Return standardized data models
- Handle authentication consistently
- Provide error handling
- Log operations
3. **Respect Interface Requirements**:
- Must implement all required methods
- Must return data in expected format
- Must handle errors gracefully
### Connector Examples
#### Database Connectors
```python
# PostgreSQL Connector
class ConnectorDbPostgre:
async def executeQuery(self, query: str, params: dict):
# Execute against PostgreSQL
pass
```
```python
# JSON File Connector
class ConnectorDbJson:
async def executeQuery(self, query: str, params: dict):
# Execute against JSON files
pass
```
#### Ticket System Connectors
```python
# Jira Connector
class ConnectorTicketsJira(TicketBase):
async def read_attributes(self) -> List[TicketFieldAttribute]:
# Read Jira field attributes
pass
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
# Read Jira tasks
pass
```
```python
# ClickUp Connector
class ConnectorTicketsClickup(TicketBase):
async def read_attributes(self) -> List[TicketFieldAttribute]:
# Read ClickUp field attributes
pass
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
# Read ClickUp tasks
pass
```
### Connector Registration
Connectors are registered dynamically:
```python
# In interface
connectors = [
ConnectorTicketsJira(credentials),
ConnectorTicketsClickup(credentials)
]
```
### Imports Constraint
Connectors layer imports:
- `modules.datamodels.*` - Data models
- `modules.interfaces.*` - Interface contracts
- External libraries (aiohttp, etc.)
**NO direct imports from:**
- Services
- Workflows
- Methods
---
## Layer 6: Data Models (@datamodels/)
### Purpose
Data models define the core data structures used throughout the system. They use Pydantic for validation and type safety.
### Key Data Models
#### Chat Models (datamodelChat.py)
```python
ChatWorkflow:
- id: str
- name: str
- status: str
- workflowMode: str # "React" or "Actionplan"
- currentRound: int
- currentTask: int
- currentAction: int
- totalTasks: int
- totalActions: int
- messages: List[ChatMessage]
- logs: List[ChatLog]
- stats: List[ChatStat]
ChatMessage:
- id: str
- workflowId: str
- role: str # "user" or "assistant"
- message: str
- documents: List[ChatDocument]
- documentsLabel: str
- roundNumber: int
- taskNumber: int
- actionNumber: int
- taskProgress: str
- actionProgress: str
TaskContext:
- task_step: TaskStep
- workflow: ChatWorkflow
- workflow_id: str
- previous_results: List[str]
- available_documents: List[ChatDocument]
- improvements: List[str]
- retry_count: int
TaskStep:
- objective: str
- description: str
- criteria: List[str]
TaskPlan:
- tasks: List[TaskStep]
- summary: str
ActionResult:
- success: bool
- documents: List[ActionDocument]
- error: str (optional)
```
#### AI Models (datamodelAi.py)
```python
AiModel:
- name: str
- displayName: str
- connectorType: str
- contextLength: int
- qualityRating: int # 1-10
- speedRating: int # 1-10
- costPer1kTokensInput: float
- costPer1kTokensOutput: float
- operationTypes: List[OperationTypeRating]
AiCallOptions:
- operationType: OperationTypeEnum
- resultFormat: str
- processingMode: ProcessingModeEnum
- priority: PriorityEnum
- modelName: str (optional)
AiCallRequest:
- prompt: str
- context: str (optional)
- options: AiCallOptions
- placeholders: List[PromptPlaceholder] (optional)
AiCallResponse:
- content: str
- modelName: str
- priceCHF: float
- processingTime: float
- bytesSent: int
- bytesReceived: int
- errorCount: int
```
#### Extraction Models (datamodelExtraction.py)
```python
ContentPart:
- label: str
- data: str
- metadata: ContentMetadata
- typeGroup: str
ContentExtracted:
- id: str
- parts: List[ContentPart]
ContentMetadata:
- size: int
- mimeType: str
- pages: int (optional)
- error: str (optional)
MergeStrategy:
- mergeType: str
- config: dict
```
### Data Model Usage
Data models are imported across all layers:
```python
# In any layer
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage
from modules.datamodels.datamodelAi import AiCallOptions
```
---
## AI Core System (@aicore/)
### Purpose
The AI core provides dynamic model selection based on KPIs, business requirements, and operation types. It harmonizes model parameters for optimal performance.
### Key Components
#### aicoreModelRegistry.py
**Purpose**: Centralized model registry for all AI connectors
**Key Functions:**
```python
# Discover and register connectors
connectors = modelRegistry.discoverConnectors()
modelRegistry.registerConnector(connector)
# Get available models
availableModels = modelRegistry.getAvailableModels()
# Get models by operation type
models = modelRegistry.getModelsByOperationType(operationType)
# Get specific model
model = modelRegistry.getModel(modelName)
```
#### aicoreModelSelector.py
**Purpose**: Intelligent model selection based on requirements
**Selection Criteria:**
1. **Operation Type**: Primary filter (must match)
2. **Context Length**: Must fit within model limits
3. **Prompt Size**: Checks if prompt fits
4. **Processing Mode**: Basic/Advanced/Detailed
5. **Priority**: Speed/Quality/Cost/Balanced
**Selection Algorithm:**
```python
def selectModel(prompt, context, options, availableModels):
# Step 1: Filter by operation type
operationFiltered = filterByOperationType(availableModels, options)
# Step 2: Filter by prompt size
promptFiltered = filterByPromptSize(operationFiltered, prompt)
# Step 3: Calculate scores
scoredModels = calculateScores(promptFiltered, prompt, context, options)
# Step 4: Sort by score
sortedModels = sortByScore(scoredModels)
return sortedModels[0] # Best model
```
**Scoring Formula:**
```
score = (
operationTypeRating * 1000.0 + # Primary
sizeRating + # Context fit
processingModeRating + # Mode compatibility
priorityRating # Speed/Quality/Cost
)
```
#### aicorePlugin*.py
AI connectors for different providers:
- **aicorePluginOpenai**: OpenAI models (GPT-4, etc.)
- **aicorePluginAnthropic**: Claude models
- **aicorePluginPerplexity**: Perplexity AI
- **aicorePluginTavily**: Tavily search
- **aicorePluginInternal**: Internal models
**Connector Interface:**
```python
class BaseConnectorAi(ABC):
def getModels(self) -> List[AiModel]:
"""Return all available models from this connector."""
pass
def getConnectorType(self) -> str:
"""Return connector type identifier."""
pass
```
### Dynamic Model Selection
**Selection Process:**
```
1. Analyze request (prompt, context, operation type)
2. Get all available models from registry
3. Filter by operation type (MUST match)
4. Filter by size constraints
5. Score each model
6. Select best model
7. Execute with failover to backup models
```
**Failover Mechanism:**
```python
# Try models in priority order
for model in failoverModels:
try:
result = await callModel(model, prompt, context)
return result
except Exception:
# Try next model
continue
# If all models fail
raise Exception("All models failed")
```
### Operation Types
Models are rated for different operation types:
- `GENERATE`: Text generation
- `ANALYZE`: Text analysis
- `EXTRACT`: Information extraction
- `PLAN`: Task planning
- `IMAGE_GENERATE`: Image generation
- `IMAGE_ANALYZE`: Image analysis
**Rating System:**
- Each model has ratings per operation type (1-10)
- Higher rating = better for that operation
- Rating * 1000 is primary sorting criteria
### Processing Modes
- `BASIC`: Simple operations, fast processing
- `ADVANCED`: Moderate complexity
- `DETAILED`: High complexity, thorough analysis
**Compatibility:**
- Models can handle their mode or downgrade
- Mode compatibility affects scoring
### Priority Handling
- `BALANCED`: Default, considers all factors
- `SPEED`: Optimize for speed
- `QUALITY`: Optimize for quality
- `COST`: Optimize for cost
**Priority Scoring:**
```python
if priority == "SPEED":
score += model.speedRating / 10.0
elif priority == "QUALITY":
score += model.qualityRating / 10.0
elif priority == "COST":
score += costOptimization(model)
```
### Harmonized Parameters
The system harmonizes parameters across different model APIs:
- Temperature/sampling parameters
- Max tokens settings
- Context length handling
- Response format handling
### Usage Example
```python
# In service layer
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
# Get available models
availableModels = modelRegistry.getAvailableModels()
# Create options
options = AiCallOptions(
operationType=OperationTypeEnum.GENERATE,
resultFormat="json",
processingMode=ProcessingModeEnum.DETAILED,
priority=PriorityEnum.QUALITY
)
# Select best model
selectedModel = modelSelector.selectModel(
prompt=prompt,
context=context,
options=options,
availableModels=availableModels
)
# Execute with failover
response = await executeWithFailover(
models=modelSelector.getFailoverModelList(...),
prompt=prompt,
context=context,
options=options
)
```
---
## Data Flow
### Complete Request Flow
```
User Input
WorkflowManager.workflowStart()
WorkflowProcessor.generateTaskPlan()
AI Planning Call (services.ai.callAiPlanning)
Task Plan Created
For each Task:
ActionPlan Generated
For each Action:
Method Action Executed
Service Layer Called
Interface Layer Called
Connector Executes
Results Returned
Results Aggregated
Final Response Generated
```
### Document Processing Flow
```
Source Document (PDF, DOCX, etc.)
services.extraction.extractContent()
Extractor (extractorPdf, extractorDocx, etc.)
Chunker (if document too large)
ContentExtracted (standard JSON structure)
AI Processing (services.ai.callAiDocuments)
Merger (merge AI results)
Renderer (rendererHtml, rendererPdf, etc.)
Destination Document (HTML, PDF, etc.)
```
---
## Design Principles
### 1. Layered Architecture
Each layer has specific responsibilities and imports only from layers below it.
### 2. Standardization
Standardized data models, interfaces, and method signatures enable plug & play components.
### 3. Dynamic Selection
AI models selected dynamically based on requirements, not hardcoded.
### 4. Failover Mechanism
Automatic failover to backup models ensures reliability.
### 5. Progress Tracking
Complete progress tracking throughout workflow execution.
### 6. Document Transformation
Standardized JSON intermediate format enables any-to-any document transformation.
### 7. Modularity
Clear separation between workflows, methods, services, interfaces, and connectors.
### 8. Type Safety
Pydantic models ensure type safety and validation across the system.
---
## Development Guidelines
### Adding a New Method
1. Create `methodX.py` in `modules/workflows/methods/`
2. Inherit from `MethodBase`
3. Implement `@action` decorated methods
4. Follow parameter principles
5. Delegate to services layer
```python
class MethodX(MethodBase):
def __init__(self, services):
super().__init__(services)
self.name = "x"
self.description = "X operations"
@action
async def operation(self, parameters):
# Extract parameters
# Call services layer
# Return ActionResult
pass
```
### Adding a New Service
1. Create `serviceX/` directory
2. Implement `mainServiceX.py` with public API
3. Add sub-modules as needed
4. Register in `services/__init__.py`
```python
# In services/__init__.py
from .serviceX.mainServiceX import ServiceX
self.x = PublicService(ServiceX(self))
```
### Adding a New Connector
1. Create `connectorInterfaceXProvider.py` in `connectors/`
2. Implement interface contract
3. Ensure standardization
4. Handle errors gracefully
5. Register in interface
### Adding a New Extractor/Renderer
1. Create `extractorX.py` or `rendererX.py`
2. Register in `ExtractorRegistry` or `RendererRegistry`
3. Implement interface:
- `extract(documentBytes, fileName, mimeType) -> ContentExtracted`
- or `render(contentData, options) -> bytes`
---
## Error Handling
### Workflow Errors
- Caught at workflow processor level
- Workflow status set to "failed"
- Error logged and returned to user
### Action Errors
- Caught at method level
- ActionResult returned with `success=False`
- Error message in `error` field
### Service Errors
- Caught at service level
- Logged with context
- Re-raised or handled gracefully
### Connector Errors
- Caught at connector level
- Retry with next connector (if applicable)
- Error logged and escalated
---
## Logging
### Log Levels
- `INFO`: Normal operation flow
- `DEBUG`: Detailed debugging information
- `WARNING`: Non-critical issues
- `ERROR`: Critical errors
### Log Structure
```
timestamp | level | module | message | context
```
### Workflow Logs
```python
self.services.workflow.storeLog(workflow, {
"message": "Task completed",
"type": "info",
"status": "success",
"progress": 100
})
```
---
## Testing
### Unit Tests
Test individual components in isolation.
### Integration Tests
Test interaction between components.
### Workflow Tests
Test complete workflow execution.
### Mock Services
Use mock services for testing without external dependencies.
---
## Performance Considerations
### Async/Await
All I/O operations are async for optimal performance.
### Lazy Initialization
Services initialized lazily to reduce startup time.
### Caching
Model registry caches connector instances.
### Chunking
Large documents chunked for efficient processing.
### Failover
Automatic failover ensures reliability without sacrificing performance.
---
## Security
### Access Control
User permission checks at interface level.
### Data Validation
Pydantic models validate all inputs.
### Sanitization
Prompt content sanitized to prevent injection.
### Encryption
Sensitive data encrypted at rest and in transit.
---
## Conclusion
The PowerOn workflow system is a sophisticated, multi-layered architecture that provides:
- **Intelligent Task Automation**: AI-powered task and action planning
- **Document Processing**: Plug & play extractors and renderers for any format
- **Service Catalog**: Comprehensive business feature catalog
- **Dynamic Model Selection**: Optimal AI model selection based on requirements
- **Standardized Interfaces**: Consistent APIs across all components
- **Extensibility**: Easy to add new methods, services, connectors
The system emphasizes:
- Separation of concerns
- Standardization
- Modularity
- Type safety
- Performance
- Reliability