36 KiB
Workflow System - Developer Documentation
Overview
The PowerOn workflow system is a multi-layered architecture designed for intelligent task automation and document processing. The system processes user requests through a structured pipeline involving workflow orchestration, AI-powered task planning, action execution via specialized methods, and comprehensive service integration.
System Architecture
The workflow system is built on six distinct layers, each serving a specific purpose:
┌─────────────────────────────────────────────────────────────┐
│ Workflow Layer │
│ (workflows/) - Orchestration & Execution Management │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Method Layer │
│ (methods/) - Action Implementation & Parameters │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Service Layer │
│ (services/) - Business Logic & Feature Catalog │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Interface Layer │
│ (interfaces/) - Standardized Component APIs │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Connector Layer │
│ (connectors/) - External System Integration │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Datamodels & AI Core │
│ (datamodels/, aicore/) - Data Models & AI Orchestration │
└─────────────────────────────────────────────────────────────┘
Layer 1: Workflow System (@workflows/)
Purpose
The workflow layer is the top-level orchestration system that manages task planning, execution, and state management. It coordinates between different workflow modes (React, Actionplan, etc.) and handles the complete workflow lifecycle.
Key Components
workflowManager.py
Entry point for workflow operations.
Key Functions:
workflowStart(): Initialize and start a workflow instanceworkflowStop(): Stop an active workflow_workflowProcess(): Main workflow execution loop_planTasks(): Generate task plans using AI_executeTasks(): Execute tasks sequentially
Workflow States:
running: Workflow is actively processingstopped: User manually stopped the workflowcompleted: Workflow finished successfullyfailed: Workflow encountered an error
Example:
# Start a new workflow
workflow = await workflowManager.workflowStart(
userInput=UserInputRequest(prompt="Analyze sales data"),
workflowMode="React" # or "Actionplan"
)
Workflow Types
-
React Mode: Iterative task execution with up to 5 refinement steps
- Best for exploratory tasks that may require multiple passes
- Uses
maxSteps: 5configuration
-
Actionplan Mode: Single-pass task execution with full action planning upfront
- Best for well-defined tasks with clear objectives
- Single execution path
Workflow Lifecycle:
start → analyzeUserInput → planTasks → executeTasks → generateResults → complete
Processing Layer (processing/)
The processing layer handles mode-specific execution logic:
workflowProcessor.py: Main processor delegating to mode implementationsmodes/modeBase.py: Base class for all workflow modesmodes/modeReact.py: React mode implementationmodes/modeActionplan.py: Actionplan mode implementation
Task Execution Flow:
# Task planning
taskPlan = await processor.generateTaskPlan(userInput, workflow)
# Task execution
for task in taskPlan.tasks:
result = await processor.executeTask(task, workflow, context)
# Process results and handover
Task and Action Planning
The system operates on a two-level planning hierarchy:
-
Task Planning: High-level objective breakdown
- Each task represents a major objective
- Tasks are planned upfront (Actionplan) or iteratively (React)
-
Action Planning: Low-level operation sequencing
- Actions are specific operations to accomplish a task
- Actions are executed via method implementations
TaskContext Structure:
TaskContext(
task_step: TaskStep,
workflow: ChatWorkflow,
workflow_id: str,
previous_results: List[str],
improvements: List[str],
retry_count: int,
# ... other context fields
)
Imports Constraint
Workflows layer imports ONLY from:
modules.datamodels.*- Data modelsmodules.services.*- Service layermodules.workflows.*- Internal workflow components
NO direct imports from:
- Interfaces
- Connectors
- AI core (use via services.ai)
Layer 2: Methods (@methods/)
Purpose
Methods implement executable actions. They define the interface between workflow planning and service execution, providing a standard way to perform operations with parameter handling.
Method Structure
All methods inherit from MethodBase:
class MethodBase:
def __init__(self, services):
self.services = services
self.name = "method_name"
self.description = "Method description"
@property
def actions(self) -> Dict[str, Dict]:
# Auto-discovers @action decorated methods
pass
Action Decorator
Methods expose executable actions using the @action decorator:
class MethodAi(MethodBase):
@action
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Process a user prompt with AI.
Parameters:
- aiPrompt (str, required): Instruction for the AI
- documentList (list, optional): Document references
- resultType (str, optional): Output format (txt, json, etc.)
"""
aiPrompt = parameters.get("aiPrompt")
documentList = parameters.get("documentList", [])
resultType = parameters.get("resultType", "txt")
# Call service layer
result = await self.services.ai.callAiDocuments(...)
# Return ActionResult
return ActionResult.isSuccess(documents=[...])
Parameter Principles
- Type Safety: Parameters are typed and validated
- Required vs Optional: Documented in docstrings
- Default Values: Optional parameters have defaults
- Service Delegation: Methods delegate to services layer
What Belongs in Methods vs Services
Methods handle:
- Parameter extraction and validation
- Action-specific orchestration
- Document packaging/unpackaging
- Error handling for action execution
Services handle:
- Core business logic
- Standardized operations
- Data transformation
- Cross-cutting concerns (logging, stats)
Example Method Implementation
class MethodOutlook(MethodBase):
def __init__(self, services):
super().__init__(services)
self.name = "outlook"
self.description = "Microsoft Outlook operations"
@action
async def readMails(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Read emails from Outlook mailbox.
Parameters:
- mailboxName (str, required): Mailbox to read from
- filterCriteria (dict, optional): Email filter criteria
"""
# Extract parameters
mailboxName = parameters.get("mailboxName")
filterCriteria = parameters.get("filterCriteria", {})
# Delegate to service layer
emails = await self.services.outlook.readEmails(
mailbox=mailboxName,
filter=filterCriteria
)
# Package results as documents
documents = [ActionDocument(
documentName=f"email_{i}.json",
documentData=email.to_dict(),
mimeType="application/json"
) for i, email in enumerate(emails)]
return ActionResult.isSuccess(documents=documents)
Available Methods
-
methodAi: AI processing and generation
process: General AI processing with documentswebResearch: Web research and crawlinggenerateImage: AI image generation
-
methodOutlook: Microsoft Outlook operations
readMails: Read emailssendMail: Send emailsmanageCalendar: Calendar operations
-
methodSharepoint: SharePoint operations
readFiles: Read fileswriteFiles: Write filesmanageFolders: Folder management
Imports Constraint
Methods layer imports ONLY from:
modules.datamodels.*- Data modelsmodules.services.*- Service layermodules.workflows.methods.*- Base classes
NO direct imports from:
- Interfaces
- Connectors
- AI core
- Workflow orchestration
Layer 3: Services (@services/)
Purpose
The services layer provides a comprehensive catalog of business features. All services follow a consistent structure and are accessible via self.services.xxx.
Services Architecture
Each service is structured as:
serviceX/
- mainServiceX.py # Public API
- subCoreX.py # Core business logic
- subDocumentX.py # Document-specific logic
- subSharedX.py # Shared utilities
Service Catalog
1. serviceAi - AI Operations
Purpose: AI calls, model selection, document processing
Key Functions:
# AI document processing
await services.ai.callAiDocuments(
prompt="Generate report",
documents=[...],
options=AiCallOptions(...)
)
# AI planning calls
await services.ai.callAiPlanning(
prompt="Plan tasks",
placeholders=None
)
# Image operations
await services.ai.readImage(prompt, imageData, mimeType)
await services.ai.generateImage(prompt, size, quality, style)
Sub-modules:
subCoreAi: Core AI operationssubDocumentProcessing: Document chunking and processingsubDocumentGeneration: Single/multi-file generationsubSharedAiUtils: Shared utilities
2. serviceWorkflow - Workflow Management
Purpose: Workflow state, progress tracking, message handling
Key Functions:
# Progress tracking
services.workflow.progressLogStart(operationId, stage, step, details)
services.workflow.progressLogUpdate(operationId, progress, message)
services.workflow.progressLogFinish(operationId, success)
# Message handling
services.workflow.storeMessageWithDocuments(workflow, messageData, documents)
services.workflow.getChatDocumentsFromDocumentList(documentList)
# Workflow state
services.workflow.storeLog(workflow, logEntry)
services.workflow.storeWorkflowStat(workflow, aiResponse, process)
3. serviceExtraction - Document Extraction
Purpose: Extracting content from various document formats
Features:
- Multiple extractors (PDF, DOCX, XLSX, images, etc.)
- Chunking for large documents
- Merging strategies for AI processing
Key Functions:
# Extract content
extracted = services.extraction.extractContent(
documents=[...],
options=ExtractionOptions(...)
)
# Merge AI results
merged = services.extraction.mergeAiResults(
extractedContent=[...],
aiResults=[...],
strategy=MergeStrategy(...)
)
Plug & Play Extractors:
extractorPdf.py: PDF extractionextractorDocx.py: DOCX extractionextractorImage.py: Image OCRextractorXlsx.py: Excel extractionextractorCsv.py: CSV parsing- And more...
Chunkers:
chunkerText.py: Text chunkingchunkerTable.py: Table chunkingchunkerImage.py: Image chunkingchunkerStructure.py: Structural chunking
Mergers:
mergerText.py: Text mergingmergerTable.py: Table mergingmergerDefault.py: Default merging
4. serviceGeneration - Document Generation
Purpose: Generating documents in various formats
Features:
- Multiple renderers (JSON, HTML, PDF, DOCX, etc.)
- Template support
- Schema validation
Key Functions:
# Process action results into documents
documents = services.generation.processActionResultDocuments(
action_result,
action,
workflow
)
# Create documents from action results
created = services.generation.createDocumentsFromActionResult(
action_result,
action,
workflow,
message_id
)
Plug & Play Renderers:
rendererJson.py: JSON renderingrendererHtml.py: HTML renderingrendererPdf.py: PDF generationrendererDocx.py: DOCX generationrendererText.py: Plain text- And more...
5. serviceWeb - Web Operations
Purpose: Web research, crawling, content extraction
Key Functions:
# Perform web research
research = await services.web.performWebResearch(
prompt="Find information",
urls=[...],
country="us",
language="en",
researchDepth="general"
)
6. serviceTicket - Ticket System Integration
Purpose: Integration with ticket systems (Jira, ClickUp, etc.)
7. serviceSharepoint - SharePoint Operations
Purpose: SharePoint file and folder management
8. serviceNeutralization - Data Normalization
Purpose: Normalizing data from different sources
9. serviceNormalization - Additional Normalization
Purpose: Extended normalization features
10. serviceUtils - Utility Functions
Purpose: Timestamps, formatting, validation
Key Functions:
# Get UTC timestamp
timestamp = services.utils.timestampGetUtc()
# Other utility functions
...
Services Access Pattern
All services are accessed via self.services.xxx:
# In a method
class MyMethod(MethodBase):
@action
async def myAction(self, parameters):
# Access any service
result = await self.services.ai.callAiDocuments(...)
data = self.services.workflow.getFileInfo(fileId)
timestamp = self.services.utils.timestampGetUtc()
return ActionResult.isSuccess(...)
Plug & Play Document Processing
The system supports plug & play extractors and renderers for document transformation:
Extraction Flow:
Source Document (PDF, DOCX, etc.)
→ Extractor (registered by MIME type)
→ Chunker (if document too large)
→ ContentExtracted (standard structure)
Generation Flow:
ContentExtracted (standard structure)
→ AI Processing
→ Merger (combine AI results)
→ Renderer (convert to target format)
→ Destination Document (JSON, HTML, PDF, etc.)
Internal Data Structure:
ContentExtracted:
- id: str
- parts: List[ContentPart]
- label: str
- data: str
- metadata: ContentMetadata
- size, pages, mimeType, etc.
# Standard JSON structure for transformation
{
"documentId": "...",
"parts": [
{
"label": "text_content",
"data": "...",
"metadata": {...}
}
]
}
Transformation Matrix:
Any Source Format (PDF, DOCX, XLSX, images, etc.)
↓
Standard JSON (ContentExtracted)
↓
Any Destination Format (JSON, HTML, PDF, DOCX, etc.)
This enables transforming ANY document format to ANY other format through the standardized JSON intermediate structure.
Imports Constraint
Services layer imports ONLY from:
modules.datamodels.*- Data modelsmodules.interfaces.*- Interface layermodules.aicore.*- AI core (for AI services)modules.services.*- Other services
NO direct imports from:
- Connectors (use via interfaces)
- Workflow orchestration
- Methods
Layer 4: Interfaces (@interfaces/)
Purpose
Interfaces provide standardized APIs for accessing different data sources and components. They abstract away implementation details and enable pluggable connector systems.
Interface Structure
Each interface defines:
- Standardized Methods: Common operations across all implementations
- Data Models: Interface-specific data models
- Access Control: User permission management
Available Interfaces
1. interfaceDbChatAccess & interfaceDbChatObjects
Purpose: Chat system database operations
Key Classes:
ChatWorkflow: Workflow instancesChatMessage: Messages in a workflowChatDocument: Documents attached to messagesChatLog: Workflow logsChatStat: Workflow statistics
Key Functions:
# Workflow operations
workflow = interface.createWorkflow(workflowData)
workflow = interface.getWorkflow(workflowId)
interface.updateWorkflow(workflowId, updateData)
# Message operations
message = interface.createMessage(messageData)
messages = interface.getMessages(workflowId)
# Document operations
document = interface.createDocument(documentData)
documents = interface.getDocuments(messageId)
2. interfaceDbAppAccess & interfaceDbAppObjects
Purpose: Application database operations
Key Classes:
UserInDB: User accountsVoiceSettings: Voice configurationConnection: External connections
3. interfaceDbComponentAccess & interfaceDbComponentObjects
Purpose: Component database operations (files, data storage)
Key Classes:
FileItem: File metadataFileData: File content
Key Functions:
# File operations
fileItem = interface.createFile(name, mimeType, content)
fileData = interface.getFileData(fileId)
interface.updateFileData(fileId, content)
4. interfaceAiObjects
Purpose: AI model management and calls
Key Functions:
# AI calls
response = await interface.call(request)
# Model selection
model = interface.getModelsByOperationType(operationType)
5. interfaceTicketObjects
Purpose: Ticket system interface
Key Classes:
TicketBase: Base ticket interfaceTicketFieldAttribute: Ticket field definitions
Interface Pattern
Interfaces use a factory pattern:
def getInterface(user: User, workflow: ChatWorkflow = None) -> InterfaceType:
"""Factory function that returns appropriate implementation."""
return InterfaceImplementation(user, workflow)
Connector Integration
Interfaces can have multiple connector implementations:
interfaceDbChat
├─ connectorDbPostgre.py
├─ connectorDbJson.py
└─ (future connectors)
Each connector:
- Implements interface requirements
- Handles data source specifics
- Returns standardized data models
- Ensures interface compatibility
Imports Constraint
Interfaces layer imports:
modules.datamodels.*- Data models- Database/connector specific libraries
NO direct imports from:
- Services
- Workflows
- Methods
Layer 5: Connectors (@connectors/)
Purpose
Connectors integrate external systems and services. They implement interface contracts and provide standardized access to diverse data sources.
Naming Convention
Connector naming follows the pattern:
connector{InterfaceType}{Provider}.py
Examples:
connectorDbPostgre.py- PostgreSQL database connectorconnectorDbJson.py- JSON file connectorconnectorTicketsJira.py- Jira ticket connectorconnectorTicketsClickup.py- ClickUp connectorconnectorVoiceGoogle.py- Google Voice connector
Connector Structure
Every connector must:
- Implement Interface Contract:
class ConnectorTicketsJira(TicketBase):
def __init__(self, *, apiUsername, apiToken, apiUrl, ...):
# Initialize connector
pass
async def read_tasks(self, *, limit: int = 0) -> list[dict]:
# Implement interface method
pass
async def create_task(self, taskData: dict) -> dict:
# Implement interface method
pass
- Ensure Standardization:
- Return standardized data models
- Handle authentication consistently
- Provide error handling
- Log operations
- Respect Interface Requirements:
- Must implement all required methods
- Must return data in expected format
- Must handle errors gracefully
Connector Examples
Database Connectors
# PostgreSQL Connector
class ConnectorDbPostgre:
async def executeQuery(self, query: str, params: dict):
# Execute against PostgreSQL
pass
# JSON File Connector
class ConnectorDbJson:
async def executeQuery(self, query: str, params: dict):
# Execute against JSON files
pass
Ticket System Connectors
# Jira Connector
class ConnectorTicketsJira(TicketBase):
async def read_attributes(self) -> List[TicketFieldAttribute]:
# Read Jira field attributes
pass
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
# Read Jira tasks
pass
# ClickUp Connector
class ConnectorTicketsClickup(TicketBase):
async def read_attributes(self) -> List[TicketFieldAttribute]:
# Read ClickUp field attributes
pass
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
# Read ClickUp tasks
pass
Connector Registration
Connectors are registered dynamically:
# In interface
connectors = [
ConnectorTicketsJira(credentials),
ConnectorTicketsClickup(credentials)
]
Imports Constraint
Connectors layer imports:
modules.datamodels.*- Data modelsmodules.interfaces.*- Interface contracts- External libraries (aiohttp, etc.)
NO direct imports from:
- Services
- Workflows
- Methods
Layer 6: Data Models (@datamodels/)
Purpose
Data models define the core data structures used throughout the system. They use Pydantic for validation and type safety.
Key Data Models
Chat Models (datamodelChat.py)
ChatWorkflow:
- id: str
- name: str
- status: str
- workflowMode: str # "React" or "Actionplan"
- currentRound: int
- currentTask: int
- currentAction: int
- totalTasks: int
- totalActions: int
- messages: List[ChatMessage]
- logs: List[ChatLog]
- stats: List[ChatStat]
ChatMessage:
- id: str
- workflowId: str
- role: str # "user" or "assistant"
- message: str
- documents: List[ChatDocument]
- documentsLabel: str
- roundNumber: int
- taskNumber: int
- actionNumber: int
- taskProgress: str
- actionProgress: str
TaskContext:
- task_step: TaskStep
- workflow: ChatWorkflow
- workflow_id: str
- previous_results: List[str]
- available_documents: List[ChatDocument]
- improvements: List[str]
- retry_count: int
TaskStep:
- objective: str
- description: str
- criteria: List[str]
TaskPlan:
- tasks: List[TaskStep]
- summary: str
ActionResult:
- success: bool
- documents: List[ActionDocument]
- error: str (optional)
AI Models (datamodelAi.py)
AiModel:
- name: str
- displayName: str
- connectorType: str
- contextLength: int
- qualityRating: int # 1-10
- speedRating: int # 1-10
- costPer1kTokensInput: float
- costPer1kTokensOutput: float
- operationTypes: List[OperationTypeRating]
AiCallOptions:
- operationType: OperationTypeEnum
- resultFormat: str
- processingMode: ProcessingModeEnum
- priority: PriorityEnum
- modelName: str (optional)
AiCallRequest:
- prompt: str
- context: str (optional)
- options: AiCallOptions
- placeholders: List[PromptPlaceholder] (optional)
AiCallResponse:
- content: str
- modelName: str
- priceUsd: float
- processingTime: float
- bytesSent: int
- bytesReceived: int
- errorCount: int
Extraction Models (datamodelExtraction.py)
ContentPart:
- label: str
- data: str
- metadata: ContentMetadata
- typeGroup: str
ContentExtracted:
- id: str
- parts: List[ContentPart]
ContentMetadata:
- size: int
- mimeType: str
- pages: int (optional)
- error: str (optional)
MergeStrategy:
- mergeType: str
- config: dict
Data Model Usage
Data models are imported across all layers:
# In any layer
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage
from modules.datamodels.datamodelAi import AiCallOptions
AI Core System (@aicore/)
Purpose
The AI core provides dynamic model selection based on KPIs, business requirements, and operation types. It harmonizes model parameters for optimal performance.
Key Components
aicoreModelRegistry.py
Purpose: Centralized model registry for all AI connectors
Key Functions:
# Discover and register connectors
connectors = modelRegistry.discoverConnectors()
modelRegistry.registerConnector(connector)
# Get available models
availableModels = modelRegistry.getAvailableModels()
# Get models by operation type
models = modelRegistry.getModelsByOperationType(operationType)
# Get specific model
model = modelRegistry.getModel(modelName)
aicoreModelSelector.py
Purpose: Intelligent model selection based on requirements
Selection Criteria:
- Operation Type: Primary filter (must match)
- Context Length: Must fit within model limits
- Prompt Size: Checks if prompt fits
- Processing Mode: Basic/Advanced/Detailed
- Priority: Speed/Quality/Cost/Balanced
Selection Algorithm:
def selectModel(prompt, context, options, availableModels):
# Step 1: Filter by operation type
operationFiltered = filterByOperationType(availableModels, options)
# Step 2: Filter by prompt size
promptFiltered = filterByPromptSize(operationFiltered, prompt)
# Step 3: Calculate scores
scoredModels = calculateScores(promptFiltered, prompt, context, options)
# Step 4: Sort by score
sortedModels = sortByScore(scoredModels)
return sortedModels[0] # Best model
Scoring Formula:
score = (
operationTypeRating * 1000.0 + # Primary
sizeRating + # Context fit
processingModeRating + # Mode compatibility
priorityRating # Speed/Quality/Cost
)
aicorePlugin*.py
AI connectors for different providers:
- aicorePluginOpenai: OpenAI models (GPT-4, etc.)
- aicorePluginAnthropic: Claude models
- aicorePluginPerplexity: Perplexity AI
- aicorePluginTavily: Tavily search
- aicorePluginInternal: Internal models
Connector Interface:
class BaseConnectorAi(ABC):
def getModels(self) -> List[AiModel]:
"""Return all available models from this connector."""
pass
def getConnectorType(self) -> str:
"""Return connector type identifier."""
pass
Dynamic Model Selection
Selection Process:
1. Analyze request (prompt, context, operation type)
2. Get all available models from registry
3. Filter by operation type (MUST match)
4. Filter by size constraints
5. Score each model
6. Select best model
7. Execute with failover to backup models
Failover Mechanism:
# Try models in priority order
for model in failoverModels:
try:
result = await callModel(model, prompt, context)
return result
except Exception:
# Try next model
continue
# If all models fail
raise Exception("All models failed")
Operation Types
Models are rated for different operation types:
GENERATE: Text generationANALYZE: Text analysisEXTRACT: Information extractionPLAN: Task planningIMAGE_GENERATE: Image generationIMAGE_ANALYZE: Image analysis
Rating System:
- Each model has ratings per operation type (1-10)
- Higher rating = better for that operation
- Rating * 1000 is primary sorting criteria
Processing Modes
BASIC: Simple operations, fast processingADVANCED: Moderate complexityDETAILED: High complexity, thorough analysis
Compatibility:
- Models can handle their mode or downgrade
- Mode compatibility affects scoring
Priority Handling
BALANCED: Default, considers all factorsSPEED: Optimize for speedQUALITY: Optimize for qualityCOST: Optimize for cost
Priority Scoring:
if priority == "SPEED":
score += model.speedRating / 10.0
elif priority == "QUALITY":
score += model.qualityRating / 10.0
elif priority == "COST":
score += costOptimization(model)
Harmonized Parameters
The system harmonizes parameters across different model APIs:
- Temperature/sampling parameters
- Max tokens settings
- Context length handling
- Response format handling
Usage Example
# In service layer
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
# Get available models
availableModels = modelRegistry.getAvailableModels()
# Create options
options = AiCallOptions(
operationType=OperationTypeEnum.GENERATE,
resultFormat="json",
processingMode=ProcessingModeEnum.DETAILED,
priority=PriorityEnum.QUALITY
)
# Select best model
selectedModel = modelSelector.selectModel(
prompt=prompt,
context=context,
options=options,
availableModels=availableModels
)
# Execute with failover
response = await executeWithFailover(
models=modelSelector.getFailoverModelList(...),
prompt=prompt,
context=context,
options=options
)
Data Flow
Complete Request Flow
User Input
↓
WorkflowManager.workflowStart()
↓
WorkflowProcessor.generateTaskPlan()
↓
AI Planning Call (services.ai.callAiPlanning)
↓
Task Plan Created
↓
For each Task:
ActionPlan Generated
↓
For each Action:
Method Action Executed
↓
Service Layer Called
↓
Interface Layer Called
↓
Connector Executes
↓
Results Returned
↓
Results Aggregated
↓
Final Response Generated
Document Processing Flow
Source Document (PDF, DOCX, etc.)
↓
services.extraction.extractContent()
↓
Extractor (extractorPdf, extractorDocx, etc.)
↓
Chunker (if document too large)
↓
ContentExtracted (standard JSON structure)
↓
AI Processing (services.ai.callAiDocuments)
↓
Merger (merge AI results)
↓
Renderer (rendererHtml, rendererPdf, etc.)
↓
Destination Document (HTML, PDF, etc.)
Design Principles
1. Layered Architecture
Each layer has specific responsibilities and imports only from layers below it.
2. Standardization
Standardized data models, interfaces, and method signatures enable plug & play components.
3. Dynamic Selection
AI models selected dynamically based on requirements, not hardcoded.
4. Failover Mechanism
Automatic failover to backup models ensures reliability.
5. Progress Tracking
Complete progress tracking throughout workflow execution.
6. Document Transformation
Standardized JSON intermediate format enables any-to-any document transformation.
7. Modularity
Clear separation between workflows, methods, services, interfaces, and connectors.
8. Type Safety
Pydantic models ensure type safety and validation across the system.
Development Guidelines
Adding a New Method
- Create
methodX.pyinmodules/workflows/methods/ - Inherit from
MethodBase - Implement
@actiondecorated methods - Follow parameter principles
- Delegate to services layer
class MethodX(MethodBase):
def __init__(self, services):
super().__init__(services)
self.name = "x"
self.description = "X operations"
@action
async def operation(self, parameters):
# Extract parameters
# Call services layer
# Return ActionResult
pass
Adding a New Service
- Create
serviceX/directory - Implement
mainServiceX.pywith public API - Add sub-modules as needed
- Register in
services/__init__.py
# In services/__init__.py
from .serviceX.mainServiceX import ServiceX
self.x = PublicService(ServiceX(self))
Adding a New Connector
- Create
connectorInterfaceXProvider.pyinconnectors/ - Implement interface contract
- Ensure standardization
- Handle errors gracefully
- Register in interface
Adding a New Extractor/Renderer
- Create
extractorX.pyorrendererX.py - Register in
ExtractorRegistryorRendererRegistry - Implement interface:
extract(documentBytes, fileName, mimeType) -> ContentExtracted- or
render(contentData, options) -> bytes
Error Handling
Workflow Errors
- Caught at workflow processor level
- Workflow status set to "failed"
- Error logged and returned to user
Action Errors
- Caught at method level
- ActionResult returned with
success=False - Error message in
errorfield
Service Errors
- Caught at service level
- Logged with context
- Re-raised or handled gracefully
Connector Errors
- Caught at connector level
- Retry with next connector (if applicable)
- Error logged and escalated
Logging
Log Levels
INFO: Normal operation flowDEBUG: Detailed debugging informationWARNING: Non-critical issuesERROR: Critical errors
Log Structure
timestamp | level | module | message | context
Workflow Logs
self.services.workflow.storeLog(workflow, {
"message": "Task completed",
"type": "info",
"status": "success",
"progress": 100
})
Testing
Unit Tests
Test individual components in isolation.
Integration Tests
Test interaction between components.
Workflow Tests
Test complete workflow execution.
Mock Services
Use mock services for testing without external dependencies.
Performance Considerations
Async/Await
All I/O operations are async for optimal performance.
Lazy Initialization
Services initialized lazily to reduce startup time.
Caching
Model registry caches connector instances.
Chunking
Large documents chunked for efficient processing.
Failover
Automatic failover ensures reliability without sacrificing performance.
Security
Access Control
User permission checks at interface level.
Data Validation
Pydantic models validate all inputs.
Sanitization
Prompt content sanitized to prevent injection.
Encryption
Sensitive data encrypted at rest and in transit.
Conclusion
The PowerOn workflow system is a sophisticated, multi-layered architecture that provides:
- Intelligent Task Automation: AI-powered task and action planning
- Document Processing: Plug & play extractors and renderers for any format
- Service Catalog: Comprehensive business feature catalog
- Dynamic Model Selection: Optimal AI model selection based on requirements
- Standardized Interfaces: Consistent APIs across all components
- Extensibility: Easy to add new methods, services, connectors
The system emphasizes:
- Separation of concerns
- Standardization
- Modularity
- Type safety
- Performance
- Reliability