1412 lines
36 KiB
Markdown
1412 lines
36 KiB
Markdown
# Workflow System - Developer Documentation
|
|
|
|
## Overview
|
|
|
|
The PowerOn workflow system is a multi-layered architecture designed for intelligent task automation and document processing. The system processes user requests through a structured pipeline involving workflow orchestration, AI-powered task planning, action execution via specialized methods, and comprehensive service integration.
|
|
|
|
## System Architecture
|
|
|
|
The workflow system is built on six distinct layers, each serving a specific purpose:
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Workflow Layer │
|
|
│ (workflows/) - Orchestration & Execution Management │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Method Layer │
|
|
│ (methods/) - Action Implementation & Parameters │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Service Layer │
|
|
│ (services/) - Business Logic & Feature Catalog │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Interface Layer │
|
|
│ (interfaces/) - Standardized Component APIs │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Connector Layer │
|
|
│ (connectors/) - External System Integration │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
↓
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Datamodels & AI Core │
|
|
│ (datamodels/, aicore/) - Data Models & AI Orchestration │
|
|
└─────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Layer 1: Workflow System (@workflows/)
|
|
|
|
### Purpose
|
|
|
|
The workflow layer is the top-level orchestration system that manages task planning, execution, and state management. It coordinates between different workflow modes (React, Actionplan, etc.) and handles the complete workflow lifecycle.
|
|
|
|
### Key Components
|
|
|
|
#### workflowManager.py
|
|
|
|
Entry point for workflow operations.
|
|
|
|
**Key Functions:**
|
|
- `workflowStart()`: Initialize and start a workflow instance
|
|
- `workflowStop()`: Stop an active workflow
|
|
- `_workflowProcess()`: Main workflow execution loop
|
|
- `_planTasks()`: Generate task plans using AI
|
|
- `_executeTasks()`: Execute tasks sequentially
|
|
|
|
**Workflow States:**
|
|
- `running`: Workflow is actively processing
|
|
- `stopped`: User manually stopped the workflow
|
|
- `completed`: Workflow finished successfully
|
|
- `failed`: Workflow encountered an error
|
|
|
|
**Example:**
|
|
```python
|
|
# Start a new workflow
|
|
workflow = await workflowManager.workflowStart(
|
|
userInput=UserInputRequest(prompt="Analyze sales data"),
|
|
workflowMode="React" # or "Actionplan"
|
|
)
|
|
```
|
|
|
|
#### Workflow Types
|
|
|
|
1. **React Mode**: Iterative task execution with up to 5 refinement steps
|
|
- Best for exploratory tasks that may require multiple passes
|
|
- Uses `maxSteps: 5` configuration
|
|
|
|
2. **Actionplan Mode**: Single-pass task execution with full action planning upfront
|
|
- Best for well-defined tasks with clear objectives
|
|
- Single execution path
|
|
|
|
**Workflow Lifecycle:**
|
|
```
|
|
start → analyzeUserInput → planTasks → executeTasks → generateResults → complete
|
|
```
|
|
|
|
#### Processing Layer (processing/)
|
|
|
|
The processing layer handles mode-specific execution logic:
|
|
|
|
- `workflowProcessor.py`: Main processor delegating to mode implementations
|
|
- `modes/modeBase.py`: Base class for all workflow modes
|
|
- `modes/modeReact.py`: React mode implementation
|
|
- `modes/modeActionplan.py`: Actionplan mode implementation
|
|
|
|
**Task Execution Flow:**
|
|
```python
|
|
# Task planning
|
|
taskPlan = await processor.generateTaskPlan(userInput, workflow)
|
|
|
|
# Task execution
|
|
for task in taskPlan.tasks:
|
|
result = await processor.executeTask(task, workflow, context)
|
|
# Process results and handover
|
|
```
|
|
|
|
### Task and Action Planning
|
|
|
|
The system operates on a two-level planning hierarchy:
|
|
|
|
1. **Task Planning**: High-level objective breakdown
|
|
- Each task represents a major objective
|
|
- Tasks are planned upfront (Actionplan) or iteratively (React)
|
|
|
|
2. **Action Planning**: Low-level operation sequencing
|
|
- Actions are specific operations to accomplish a task
|
|
- Actions are executed via method implementations
|
|
|
|
**TaskContext Structure:**
|
|
```python
|
|
TaskContext(
|
|
task_step: TaskStep,
|
|
workflow: ChatWorkflow,
|
|
workflow_id: str,
|
|
previous_results: List[str],
|
|
improvements: List[str],
|
|
retry_count: int,
|
|
# ... other context fields
|
|
)
|
|
```
|
|
|
|
### Imports Constraint
|
|
|
|
Workflows layer imports ONLY from:
|
|
- `modules.datamodels.*` - Data models
|
|
- `modules.services.*` - Service layer
|
|
- `modules.workflows.*` - Internal workflow components
|
|
|
|
**NO direct imports from:**
|
|
- Interfaces
|
|
- Connectors
|
|
- AI core (use via services.ai)
|
|
|
|
---
|
|
|
|
## Layer 2: Methods (@methods/)
|
|
|
|
### Purpose
|
|
|
|
Methods implement executable actions. They define the interface between workflow planning and service execution, providing a standard way to perform operations with parameter handling.
|
|
|
|
### Method Structure
|
|
|
|
All methods inherit from `MethodBase`:
|
|
|
|
```python
|
|
class MethodBase:
|
|
def __init__(self, services):
|
|
self.services = services
|
|
self.name = "method_name"
|
|
self.description = "Method description"
|
|
|
|
@property
|
|
def actions(self) -> Dict[str, Dict]:
|
|
# Auto-discovers @action decorated methods
|
|
pass
|
|
```
|
|
|
|
### Action Decorator
|
|
|
|
Methods expose executable actions using the `@action` decorator:
|
|
|
|
```python
|
|
class MethodAi(MethodBase):
|
|
@action
|
|
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Process a user prompt with AI.
|
|
|
|
Parameters:
|
|
- aiPrompt (str, required): Instruction for the AI
|
|
- documentList (list, optional): Document references
|
|
- resultType (str, optional): Output format (txt, json, etc.)
|
|
"""
|
|
aiPrompt = parameters.get("aiPrompt")
|
|
documentList = parameters.get("documentList", [])
|
|
resultType = parameters.get("resultType", "txt")
|
|
|
|
# Call service layer
|
|
result = await self.services.ai.callAiDocuments(...)
|
|
|
|
# Return ActionResult
|
|
return ActionResult.isSuccess(documents=[...])
|
|
```
|
|
|
|
### Parameter Principles
|
|
|
|
1. **Type Safety**: Parameters are typed and validated
|
|
2. **Required vs Optional**: Documented in docstrings
|
|
3. **Default Values**: Optional parameters have defaults
|
|
4. **Service Delegation**: Methods delegate to services layer
|
|
|
|
### What Belongs in Methods vs Services
|
|
|
|
**Methods handle:**
|
|
- Parameter extraction and validation
|
|
- Action-specific orchestration
|
|
- Document packaging/unpackaging
|
|
- Error handling for action execution
|
|
|
|
**Services handle:**
|
|
- Core business logic
|
|
- Standardized operations
|
|
- Data transformation
|
|
- Cross-cutting concerns (logging, stats)
|
|
|
|
### Example Method Implementation
|
|
|
|
```python
|
|
class MethodOutlook(MethodBase):
|
|
def __init__(self, services):
|
|
super().__init__(services)
|
|
self.name = "outlook"
|
|
self.description = "Microsoft Outlook operations"
|
|
|
|
@action
|
|
async def readMails(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Read emails from Outlook mailbox.
|
|
|
|
Parameters:
|
|
- mailboxName (str, required): Mailbox to read from
|
|
- filterCriteria (dict, optional): Email filter criteria
|
|
"""
|
|
# Extract parameters
|
|
mailboxName = parameters.get("mailboxName")
|
|
filterCriteria = parameters.get("filterCriteria", {})
|
|
|
|
# Delegate to service layer
|
|
emails = await self.services.outlook.readEmails(
|
|
mailbox=mailboxName,
|
|
filter=filterCriteria
|
|
)
|
|
|
|
# Package results as documents
|
|
documents = [ActionDocument(
|
|
documentName=f"email_{i}.json",
|
|
documentData=email.to_dict(),
|
|
mimeType="application/json"
|
|
) for i, email in enumerate(emails)]
|
|
|
|
return ActionResult.isSuccess(documents=documents)
|
|
```
|
|
|
|
### Available Methods
|
|
|
|
1. **methodAi**: AI processing and generation
|
|
- `process`: General AI processing with documents
|
|
- `webResearch`: Web research and crawling
|
|
- `generateImage`: AI image generation
|
|
|
|
2. **methodOutlook**: Microsoft Outlook operations
|
|
- `readMails`: Read emails
|
|
- `sendMail`: Send emails
|
|
- `manageCalendar`: Calendar operations
|
|
|
|
3. **methodSharepoint**: SharePoint operations
|
|
- `readFiles`: Read files
|
|
- `writeFiles`: Write files
|
|
- `manageFolders`: Folder management
|
|
|
|
### Imports Constraint
|
|
|
|
Methods layer imports ONLY from:
|
|
- `modules.datamodels.*` - Data models
|
|
- `modules.services.*` - Service layer
|
|
- `modules.workflows.methods.*` - Base classes
|
|
|
|
**NO direct imports from:**
|
|
- Interfaces
|
|
- Connectors
|
|
- AI core
|
|
- Workflow orchestration
|
|
|
|
---
|
|
|
|
## Layer 3: Services (@services/)
|
|
|
|
### Purpose
|
|
|
|
The services layer provides a comprehensive catalog of business features. All services follow a consistent structure and are accessible via `self.services.xxx`.
|
|
|
|
### Services Architecture
|
|
|
|
Each service is structured as:
|
|
|
|
```
|
|
serviceX/
|
|
- mainServiceX.py # Public API
|
|
- subCoreX.py # Core business logic
|
|
- subDocumentX.py # Document-specific logic
|
|
- subSharedX.py # Shared utilities
|
|
```
|
|
|
|
### Service Catalog
|
|
|
|
#### 1. serviceAi - AI Operations
|
|
**Purpose**: AI calls, model selection, document processing
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# AI document processing
|
|
await services.ai.callAiDocuments(
|
|
prompt="Generate report",
|
|
documents=[...],
|
|
options=AiCallOptions(...)
|
|
)
|
|
|
|
# AI planning calls
|
|
await services.ai.callAiPlanning(
|
|
prompt="Plan tasks",
|
|
placeholders=None
|
|
)
|
|
|
|
# Image operations
|
|
await services.ai.readImage(prompt, imageData, mimeType)
|
|
await services.ai.generateImage(prompt, size, quality, style)
|
|
```
|
|
|
|
**Sub-modules:**
|
|
- `subCoreAi`: Core AI operations
|
|
- `subDocumentProcessing`: Document chunking and processing
|
|
- `subDocumentGeneration`: Single/multi-file generation
|
|
- `subSharedAiUtils`: Shared utilities
|
|
|
|
#### 2. serviceWorkflow - Workflow Management
|
|
**Purpose**: Workflow state, progress tracking, message handling
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Progress tracking
|
|
services.workflow.progressLogStart(operationId, stage, step, details)
|
|
services.workflow.progressLogUpdate(operationId, progress, message)
|
|
services.workflow.progressLogFinish(operationId, success)
|
|
|
|
# Message handling
|
|
services.workflow.storeMessageWithDocuments(workflow, messageData, documents)
|
|
services.workflow.getChatDocumentsFromDocumentList(documentList)
|
|
|
|
# Workflow state
|
|
services.workflow.storeLog(workflow, logEntry)
|
|
services.workflow.storeWorkflowStat(workflow, aiResponse, process)
|
|
```
|
|
|
|
#### 3. serviceExtraction - Document Extraction
|
|
**Purpose**: Extracting content from various document formats
|
|
|
|
**Features:**
|
|
- Multiple extractors (PDF, DOCX, XLSX, images, etc.)
|
|
- Chunking for large documents
|
|
- Merging strategies for AI processing
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Extract content
|
|
extracted = services.extraction.extractContent(
|
|
documents=[...],
|
|
options=ExtractionOptions(...)
|
|
)
|
|
|
|
# Merge AI results
|
|
merged = services.extraction.mergeAiResults(
|
|
extractedContent=[...],
|
|
aiResults=[...],
|
|
strategy=MergeStrategy(...)
|
|
)
|
|
```
|
|
|
|
**Plug & Play Extractors:**
|
|
- `extractorPdf.py`: PDF extraction
|
|
- `extractorDocx.py`: DOCX extraction
|
|
- `extractorImage.py`: Image OCR
|
|
- `extractorXlsx.py`: Excel extraction
|
|
- `extractorCsv.py`: CSV parsing
|
|
- And more...
|
|
|
|
**Chunkers:**
|
|
- `chunkerText.py`: Text chunking
|
|
- `chunkerTable.py`: Table chunking
|
|
- `chunkerImage.py`: Image chunking
|
|
- `chunkerStructure.py`: Structural chunking
|
|
|
|
**Mergers:**
|
|
- `mergerText.py`: Text merging
|
|
- `mergerTable.py`: Table merging
|
|
- `mergerDefault.py`: Default merging
|
|
|
|
#### 4. serviceGeneration - Document Generation
|
|
**Purpose**: Generating documents in various formats
|
|
|
|
**Features:**
|
|
- Multiple renderers (JSON, HTML, PDF, DOCX, etc.)
|
|
- Template support
|
|
- Schema validation
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Process action results into documents
|
|
documents = services.generation.processActionResultDocuments(
|
|
action_result,
|
|
action,
|
|
workflow
|
|
)
|
|
|
|
# Create documents from action results
|
|
created = services.generation.createDocumentsFromActionResult(
|
|
action_result,
|
|
action,
|
|
workflow,
|
|
message_id
|
|
)
|
|
```
|
|
|
|
**Plug & Play Renderers:**
|
|
- `rendererJson.py`: JSON rendering
|
|
- `rendererHtml.py`: HTML rendering
|
|
- `rendererPdf.py`: PDF generation
|
|
- `rendererDocx.py`: DOCX generation
|
|
- `rendererText.py`: Plain text
|
|
- And more...
|
|
|
|
#### 5. serviceWeb - Web Operations
|
|
**Purpose**: Web research, crawling, content extraction
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Perform web research
|
|
research = await services.web.performWebResearch(
|
|
prompt="Find information",
|
|
urls=[...],
|
|
country="us",
|
|
language="en",
|
|
researchDepth="general"
|
|
)
|
|
```
|
|
|
|
#### 6. serviceTicket - Ticket System Integration
|
|
**Purpose**: Integration with ticket systems (Jira, ClickUp, etc.)
|
|
|
|
#### 7. serviceSharepoint - SharePoint Operations
|
|
**Purpose**: SharePoint file and folder management
|
|
|
|
#### 8. serviceNeutralization - Data Normalization
|
|
**Purpose**: Normalizing data from different sources
|
|
|
|
#### 9. serviceNormalization - Additional Normalization
|
|
**Purpose**: Extended normalization features
|
|
|
|
#### 10. serviceUtils - Utility Functions
|
|
**Purpose**: Timestamps, formatting, validation
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Get UTC timestamp
|
|
timestamp = services.utils.timestampGetUtc()
|
|
|
|
# Other utility functions
|
|
...
|
|
|
|
```
|
|
|
|
### Services Access Pattern
|
|
|
|
All services are accessed via `self.services.xxx`:
|
|
|
|
```python
|
|
# In a method
|
|
class MyMethod(MethodBase):
|
|
@action
|
|
async def myAction(self, parameters):
|
|
# Access any service
|
|
result = await self.services.ai.callAiDocuments(...)
|
|
data = self.services.workflow.getFileInfo(fileId)
|
|
timestamp = self.services.utils.timestampGetUtc()
|
|
|
|
return ActionResult.isSuccess(...)
|
|
```
|
|
|
|
### Plug & Play Document Processing
|
|
|
|
The system supports plug & play extractors and renderers for document transformation:
|
|
|
|
**Extraction Flow:**
|
|
```
|
|
Source Document (PDF, DOCX, etc.)
|
|
→ Extractor (registered by MIME type)
|
|
→ Chunker (if document too large)
|
|
→ ContentExtracted (standard structure)
|
|
```
|
|
|
|
**Generation Flow:**
|
|
```
|
|
ContentExtracted (standard structure)
|
|
→ AI Processing
|
|
→ Merger (combine AI results)
|
|
→ Renderer (convert to target format)
|
|
→ Destination Document (JSON, HTML, PDF, etc.)
|
|
```
|
|
|
|
**Internal Data Structure:**
|
|
```python
|
|
ContentExtracted:
|
|
- id: str
|
|
- parts: List[ContentPart]
|
|
- label: str
|
|
- data: str
|
|
- metadata: ContentMetadata
|
|
- size, pages, mimeType, etc.
|
|
|
|
# Standard JSON structure for transformation
|
|
{
|
|
"documentId": "...",
|
|
"parts": [
|
|
{
|
|
"label": "text_content",
|
|
"data": "...",
|
|
"metadata": {...}
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
**Transformation Matrix:**
|
|
```
|
|
Any Source Format (PDF, DOCX, XLSX, images, etc.)
|
|
↓
|
|
Standard JSON (ContentExtracted)
|
|
↓
|
|
Any Destination Format (JSON, HTML, PDF, DOCX, etc.)
|
|
```
|
|
|
|
This enables transforming ANY document format to ANY other format through the standardized JSON intermediate structure.
|
|
|
|
### Imports Constraint
|
|
|
|
Services layer imports ONLY from:
|
|
- `modules.datamodels.*` - Data models
|
|
- `modules.interfaces.*` - Interface layer
|
|
- `modules.aicore.*` - AI core (for AI services)
|
|
- `modules.services.*` - Other services
|
|
|
|
**NO direct imports from:**
|
|
- Connectors (use via interfaces)
|
|
- Workflow orchestration
|
|
- Methods
|
|
|
|
---
|
|
|
|
## Layer 4: Interfaces (@interfaces/)
|
|
|
|
### Purpose
|
|
|
|
Interfaces provide standardized APIs for accessing different data sources and components. They abstract away implementation details and enable pluggable connector systems.
|
|
|
|
### Interface Structure
|
|
|
|
Each interface defines:
|
|
1. **Standardized Methods**: Common operations across all implementations
|
|
2. **Data Models**: Interface-specific data models
|
|
3. **Access Control**: User permission management
|
|
|
|
### Available Interfaces
|
|
|
|
#### 1. interfaceDbChatAccess & interfaceDbChatObjects
|
|
**Purpose**: Chat system database operations
|
|
|
|
**Key Classes:**
|
|
- `ChatWorkflow`: Workflow instances
|
|
- `ChatMessage`: Messages in a workflow
|
|
- `ChatDocument`: Documents attached to messages
|
|
- `ChatLog`: Workflow logs
|
|
- `ChatStat`: Workflow statistics
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Workflow operations
|
|
workflow = interface.createWorkflow(workflowData)
|
|
workflow = interface.getWorkflow(workflowId)
|
|
interface.updateWorkflow(workflowId, updateData)
|
|
|
|
# Message operations
|
|
message = interface.createMessage(messageData)
|
|
messages = interface.getMessages(workflowId)
|
|
|
|
# Document operations
|
|
document = interface.createDocument(documentData)
|
|
documents = interface.getDocuments(messageId)
|
|
```
|
|
|
|
#### 2. interfaceDbAppAccess & interfaceDbAppObjects
|
|
**Purpose**: Application database operations
|
|
|
|
**Key Classes:**
|
|
- `UserInDB`: User accounts
|
|
- `VoiceSettings`: Voice configuration
|
|
- `Connection`: External connections
|
|
|
|
#### 3. interfaceDbComponentAccess & interfaceDbComponentObjects
|
|
**Purpose**: Component database operations (files, data storage)
|
|
|
|
**Key Classes:**
|
|
- `FileItem`: File metadata
|
|
- `FileData`: File content
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# File operations
|
|
fileItem = interface.createFile(name, mimeType, content)
|
|
fileData = interface.getFileData(fileId)
|
|
interface.updateFileData(fileId, content)
|
|
```
|
|
|
|
#### 4. interfaceAiObjects
|
|
**Purpose**: AI model management and calls
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# AI calls
|
|
response = await interface.call(request)
|
|
|
|
# Model selection
|
|
model = interface.getModelsByOperationType(operationType)
|
|
```
|
|
|
|
#### 5. interfaceTicketObjects
|
|
**Purpose**: Ticket system interface
|
|
|
|
**Key Classes:**
|
|
- `TicketBase`: Base ticket interface
|
|
- `TicketFieldAttribute`: Ticket field definitions
|
|
|
|
### Interface Pattern
|
|
|
|
Interfaces use a factory pattern:
|
|
|
|
```python
|
|
def getInterface(user: User, workflow: ChatWorkflow = None) -> InterfaceType:
|
|
"""Factory function that returns appropriate implementation."""
|
|
return InterfaceImplementation(user, workflow)
|
|
```
|
|
|
|
### Connector Integration
|
|
|
|
Interfaces can have multiple connector implementations:
|
|
|
|
```
|
|
interfaceDbChat
|
|
├─ connectorDbPostgre.py
|
|
├─ connectorDbJson.py
|
|
└─ (future connectors)
|
|
```
|
|
|
|
Each connector:
|
|
- Implements interface requirements
|
|
- Handles data source specifics
|
|
- Returns standardized data models
|
|
- Ensures interface compatibility
|
|
|
|
### Imports Constraint
|
|
|
|
Interfaces layer imports:
|
|
- `modules.datamodels.*` - Data models
|
|
- Database/connector specific libraries
|
|
|
|
**NO direct imports from:**
|
|
- Services
|
|
- Workflows
|
|
- Methods
|
|
|
|
---
|
|
|
|
## Layer 5: Connectors (@connectors/)
|
|
|
|
### Purpose
|
|
|
|
Connectors integrate external systems and services. They implement interface contracts and provide standardized access to diverse data sources.
|
|
|
|
### Naming Convention
|
|
|
|
Connector naming follows the pattern:
|
|
```
|
|
connector{InterfaceType}{Provider}.py
|
|
```
|
|
|
|
Examples:
|
|
- `connectorDbPostgre.py` - PostgreSQL database connector
|
|
- `connectorDbJson.py` - JSON file connector
|
|
- `connectorTicketsJira.py` - Jira ticket connector
|
|
- `connectorTicketsClickup.py` - ClickUp connector
|
|
- `connectorVoiceGoogle.py` - Google Voice connector
|
|
|
|
### Connector Structure
|
|
|
|
Every connector must:
|
|
|
|
1. **Implement Interface Contract**:
|
|
```python
|
|
class ConnectorTicketsJira(TicketBase):
|
|
def __init__(self, *, apiUsername, apiToken, apiUrl, ...):
|
|
# Initialize connector
|
|
pass
|
|
|
|
async def read_tasks(self, *, limit: int = 0) -> list[dict]:
|
|
# Implement interface method
|
|
pass
|
|
|
|
async def create_task(self, taskData: dict) -> dict:
|
|
# Implement interface method
|
|
pass
|
|
```
|
|
|
|
2. **Ensure Standardization**:
|
|
- Return standardized data models
|
|
- Handle authentication consistently
|
|
- Provide error handling
|
|
- Log operations
|
|
|
|
3. **Respect Interface Requirements**:
|
|
- Must implement all required methods
|
|
- Must return data in expected format
|
|
- Must handle errors gracefully
|
|
|
|
### Connector Examples
|
|
|
|
#### Database Connectors
|
|
|
|
```python
|
|
# PostgreSQL Connector
|
|
class ConnectorDbPostgre:
|
|
async def executeQuery(self, query: str, params: dict):
|
|
# Execute against PostgreSQL
|
|
pass
|
|
```
|
|
|
|
```python
|
|
# JSON File Connector
|
|
class ConnectorDbJson:
|
|
async def executeQuery(self, query: str, params: dict):
|
|
# Execute against JSON files
|
|
pass
|
|
```
|
|
|
|
#### Ticket System Connectors
|
|
|
|
```python
|
|
# Jira Connector
|
|
class ConnectorTicketsJira(TicketBase):
|
|
async def read_attributes(self) -> List[TicketFieldAttribute]:
|
|
# Read Jira field attributes
|
|
pass
|
|
|
|
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
|
|
# Read Jira tasks
|
|
pass
|
|
```
|
|
|
|
```python
|
|
# ClickUp Connector
|
|
class ConnectorTicketsClickup(TicketBase):
|
|
async def read_attributes(self) -> List[TicketFieldAttribute]:
|
|
# Read ClickUp field attributes
|
|
pass
|
|
|
|
async def read_tasks(self, *, limit: int = 0) -> List[dict]:
|
|
# Read ClickUp tasks
|
|
pass
|
|
```
|
|
|
|
### Connector Registration
|
|
|
|
Connectors are registered dynamically:
|
|
|
|
```python
|
|
# In interface
|
|
connectors = [
|
|
ConnectorTicketsJira(credentials),
|
|
ConnectorTicketsClickup(credentials)
|
|
]
|
|
```
|
|
|
|
### Imports Constraint
|
|
|
|
Connectors layer imports:
|
|
- `modules.datamodels.*` - Data models
|
|
- `modules.interfaces.*` - Interface contracts
|
|
- External libraries (aiohttp, etc.)
|
|
|
|
**NO direct imports from:**
|
|
- Services
|
|
- Workflows
|
|
- Methods
|
|
|
|
---
|
|
|
|
## Layer 6: Data Models (@datamodels/)
|
|
|
|
### Purpose
|
|
|
|
Data models define the core data structures used throughout the system. They use Pydantic for validation and type safety.
|
|
|
|
### Key Data Models
|
|
|
|
#### Chat Models (datamodelChat.py)
|
|
|
|
```python
|
|
ChatWorkflow:
|
|
- id: str
|
|
- name: str
|
|
- status: str
|
|
- workflowMode: str # "React" or "Actionplan"
|
|
- currentRound: int
|
|
- currentTask: int
|
|
- currentAction: int
|
|
- totalTasks: int
|
|
- totalActions: int
|
|
- messages: List[ChatMessage]
|
|
- logs: List[ChatLog]
|
|
- stats: List[ChatStat]
|
|
|
|
ChatMessage:
|
|
- id: str
|
|
- workflowId: str
|
|
- role: str # "user" or "assistant"
|
|
- message: str
|
|
- documents: List[ChatDocument]
|
|
- documentsLabel: str
|
|
- roundNumber: int
|
|
- taskNumber: int
|
|
- actionNumber: int
|
|
- taskProgress: str
|
|
- actionProgress: str
|
|
|
|
TaskContext:
|
|
- task_step: TaskStep
|
|
- workflow: ChatWorkflow
|
|
- workflow_id: str
|
|
- previous_results: List[str]
|
|
- available_documents: List[ChatDocument]
|
|
- improvements: List[str]
|
|
- retry_count: int
|
|
|
|
TaskStep:
|
|
- objective: str
|
|
- description: str
|
|
- criteria: List[str]
|
|
|
|
TaskPlan:
|
|
- tasks: List[TaskStep]
|
|
- summary: str
|
|
|
|
ActionResult:
|
|
- success: bool
|
|
- documents: List[ActionDocument]
|
|
- error: str (optional)
|
|
```
|
|
|
|
#### AI Models (datamodelAi.py)
|
|
|
|
```python
|
|
AiModel:
|
|
- name: str
|
|
- displayName: str
|
|
- connectorType: str
|
|
- contextLength: int
|
|
- qualityRating: int # 1-10
|
|
- speedRating: int # 1-10
|
|
- costPer1kTokensInput: float
|
|
- costPer1kTokensOutput: float
|
|
- operationTypes: List[OperationTypeRating]
|
|
|
|
AiCallOptions:
|
|
- operationType: OperationTypeEnum
|
|
- resultFormat: str
|
|
- processingMode: ProcessingModeEnum
|
|
- priority: PriorityEnum
|
|
- modelName: str (optional)
|
|
|
|
AiCallRequest:
|
|
- prompt: str
|
|
- context: str (optional)
|
|
- options: AiCallOptions
|
|
- placeholders: List[PromptPlaceholder] (optional)
|
|
|
|
AiCallResponse:
|
|
- content: str
|
|
- modelName: str
|
|
- priceCHF: float
|
|
- processingTime: float
|
|
- bytesSent: int
|
|
- bytesReceived: int
|
|
- errorCount: int
|
|
```
|
|
|
|
#### Extraction Models (datamodelExtraction.py)
|
|
|
|
```python
|
|
ContentPart:
|
|
- label: str
|
|
- data: str
|
|
- metadata: ContentMetadata
|
|
- typeGroup: str
|
|
|
|
ContentExtracted:
|
|
- id: str
|
|
- parts: List[ContentPart]
|
|
|
|
ContentMetadata:
|
|
- size: int
|
|
- mimeType: str
|
|
- pages: int (optional)
|
|
- error: str (optional)
|
|
|
|
MergeStrategy:
|
|
- mergeType: str
|
|
- config: dict
|
|
```
|
|
|
|
### Data Model Usage
|
|
|
|
Data models are imported across all layers:
|
|
|
|
```python
|
|
# In any layer
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage
|
|
from modules.datamodels.datamodelAi import AiCallOptions
|
|
```
|
|
|
|
---
|
|
|
|
## AI Core System (@aicore/)
|
|
|
|
### Purpose
|
|
|
|
The AI core provides dynamic model selection based on KPIs, business requirements, and operation types. It harmonizes model parameters for optimal performance.
|
|
|
|
### Key Components
|
|
|
|
#### aicoreModelRegistry.py
|
|
|
|
**Purpose**: Centralized model registry for all AI connectors
|
|
|
|
**Key Functions:**
|
|
```python
|
|
# Discover and register connectors
|
|
connectors = modelRegistry.discoverConnectors()
|
|
modelRegistry.registerConnector(connector)
|
|
|
|
# Get available models
|
|
availableModels = modelRegistry.getAvailableModels()
|
|
|
|
# Get models by operation type
|
|
models = modelRegistry.getModelsByOperationType(operationType)
|
|
|
|
# Get specific model
|
|
model = modelRegistry.getModel(modelName)
|
|
```
|
|
|
|
#### aicoreModelSelector.py
|
|
|
|
**Purpose**: Intelligent model selection based on requirements
|
|
|
|
**Selection Criteria:**
|
|
1. **Operation Type**: Primary filter (must match)
|
|
2. **Context Length**: Must fit within model limits
|
|
3. **Prompt Size**: Checks if prompt fits
|
|
4. **Processing Mode**: Basic/Advanced/Detailed
|
|
5. **Priority**: Speed/Quality/Cost/Balanced
|
|
|
|
**Selection Algorithm:**
|
|
```python
|
|
def selectModel(prompt, context, options, availableModels):
|
|
# Step 1: Filter by operation type
|
|
operationFiltered = filterByOperationType(availableModels, options)
|
|
|
|
# Step 2: Filter by prompt size
|
|
promptFiltered = filterByPromptSize(operationFiltered, prompt)
|
|
|
|
# Step 3: Calculate scores
|
|
scoredModels = calculateScores(promptFiltered, prompt, context, options)
|
|
|
|
# Step 4: Sort by score
|
|
sortedModels = sortByScore(scoredModels)
|
|
|
|
return sortedModels[0] # Best model
|
|
```
|
|
|
|
**Scoring Formula:**
|
|
```
|
|
score = (
|
|
operationTypeRating * 1000.0 + # Primary
|
|
sizeRating + # Context fit
|
|
processingModeRating + # Mode compatibility
|
|
priorityRating # Speed/Quality/Cost
|
|
)
|
|
```
|
|
|
|
#### aicorePlugin*.py
|
|
|
|
AI connectors for different providers:
|
|
|
|
- **aicorePluginOpenai**: OpenAI models (GPT-4, etc.)
|
|
- **aicorePluginAnthropic**: Claude models
|
|
- **aicorePluginPerplexity**: Perplexity AI
|
|
- **aicorePluginTavily**: Tavily search
|
|
- **aicorePluginInternal**: Internal models
|
|
|
|
**Connector Interface:**
|
|
```python
|
|
class BaseConnectorAi(ABC):
|
|
def getModels(self) -> List[AiModel]:
|
|
"""Return all available models from this connector."""
|
|
pass
|
|
|
|
def getConnectorType(self) -> str:
|
|
"""Return connector type identifier."""
|
|
pass
|
|
```
|
|
|
|
### Dynamic Model Selection
|
|
|
|
**Selection Process:**
|
|
```
|
|
1. Analyze request (prompt, context, operation type)
|
|
2. Get all available models from registry
|
|
3. Filter by operation type (MUST match)
|
|
4. Filter by size constraints
|
|
5. Score each model
|
|
6. Select best model
|
|
7. Execute with failover to backup models
|
|
```
|
|
|
|
**Failover Mechanism:**
|
|
```python
|
|
# Try models in priority order
|
|
for model in failoverModels:
|
|
try:
|
|
result = await callModel(model, prompt, context)
|
|
return result
|
|
except Exception:
|
|
# Try next model
|
|
continue
|
|
|
|
# If all models fail
|
|
raise Exception("All models failed")
|
|
```
|
|
|
|
### Operation Types
|
|
|
|
Models are rated for different operation types:
|
|
|
|
- `GENERATE`: Text generation
|
|
- `ANALYZE`: Text analysis
|
|
- `EXTRACT`: Information extraction
|
|
- `PLAN`: Task planning
|
|
- `IMAGE_GENERATE`: Image generation
|
|
- `IMAGE_ANALYZE`: Image analysis
|
|
|
|
**Rating System:**
|
|
- Each model has ratings per operation type (1-10)
|
|
- Higher rating = better for that operation
|
|
- Rating * 1000 is primary sorting criteria
|
|
|
|
### Processing Modes
|
|
|
|
- `BASIC`: Simple operations, fast processing
|
|
- `ADVANCED`: Moderate complexity
|
|
- `DETAILED`: High complexity, thorough analysis
|
|
|
|
**Compatibility:**
|
|
- Models can handle their mode or downgrade
|
|
- Mode compatibility affects scoring
|
|
|
|
### Priority Handling
|
|
|
|
- `BALANCED`: Default, considers all factors
|
|
- `SPEED`: Optimize for speed
|
|
- `QUALITY`: Optimize for quality
|
|
- `COST`: Optimize for cost
|
|
|
|
**Priority Scoring:**
|
|
```python
|
|
if priority == "SPEED":
|
|
score += model.speedRating / 10.0
|
|
elif priority == "QUALITY":
|
|
score += model.qualityRating / 10.0
|
|
elif priority == "COST":
|
|
score += costOptimization(model)
|
|
```
|
|
|
|
### Harmonized Parameters
|
|
|
|
The system harmonizes parameters across different model APIs:
|
|
|
|
- Temperature/sampling parameters
|
|
- Max tokens settings
|
|
- Context length handling
|
|
- Response format handling
|
|
|
|
### Usage Example
|
|
|
|
```python
|
|
# In service layer
|
|
from modules.aicore.aicoreModelRegistry import modelRegistry
|
|
from modules.aicore.aicoreModelSelector import modelSelector
|
|
|
|
# Get available models
|
|
availableModels = modelRegistry.getAvailableModels()
|
|
|
|
# Create options
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.GENERATE,
|
|
resultFormat="json",
|
|
processingMode=ProcessingModeEnum.DETAILED,
|
|
priority=PriorityEnum.QUALITY
|
|
)
|
|
|
|
# Select best model
|
|
selectedModel = modelSelector.selectModel(
|
|
prompt=prompt,
|
|
context=context,
|
|
options=options,
|
|
availableModels=availableModels
|
|
)
|
|
|
|
# Execute with failover
|
|
response = await executeWithFailover(
|
|
models=modelSelector.getFailoverModelList(...),
|
|
prompt=prompt,
|
|
context=context,
|
|
options=options
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Data Flow
|
|
|
|
### Complete Request Flow
|
|
|
|
```
|
|
User Input
|
|
↓
|
|
WorkflowManager.workflowStart()
|
|
↓
|
|
WorkflowProcessor.generateTaskPlan()
|
|
↓
|
|
AI Planning Call (services.ai.callAiPlanning)
|
|
↓
|
|
Task Plan Created
|
|
↓
|
|
For each Task:
|
|
ActionPlan Generated
|
|
↓
|
|
For each Action:
|
|
Method Action Executed
|
|
↓
|
|
Service Layer Called
|
|
↓
|
|
Interface Layer Called
|
|
↓
|
|
Connector Executes
|
|
↓
|
|
Results Returned
|
|
↓
|
|
Results Aggregated
|
|
↓
|
|
Final Response Generated
|
|
```
|
|
|
|
### Document Processing Flow
|
|
|
|
```
|
|
Source Document (PDF, DOCX, etc.)
|
|
↓
|
|
services.extraction.extractContent()
|
|
↓
|
|
Extractor (extractorPdf, extractorDocx, etc.)
|
|
↓
|
|
Chunker (if document too large)
|
|
↓
|
|
ContentExtracted (standard JSON structure)
|
|
↓
|
|
AI Processing (services.ai.callAiDocuments)
|
|
↓
|
|
Merger (merge AI results)
|
|
↓
|
|
Renderer (rendererHtml, rendererPdf, etc.)
|
|
↓
|
|
Destination Document (HTML, PDF, etc.)
|
|
```
|
|
|
|
---
|
|
|
|
## Design Principles
|
|
|
|
### 1. Layered Architecture
|
|
Each layer has specific responsibilities and imports only from layers below it.
|
|
|
|
### 2. Standardization
|
|
Standardized data models, interfaces, and method signatures enable plug & play components.
|
|
|
|
### 3. Dynamic Selection
|
|
AI models selected dynamically based on requirements, not hardcoded.
|
|
|
|
### 4. Failover Mechanism
|
|
Automatic failover to backup models ensures reliability.
|
|
|
|
### 5. Progress Tracking
|
|
Complete progress tracking throughout workflow execution.
|
|
|
|
### 6. Document Transformation
|
|
Standardized JSON intermediate format enables any-to-any document transformation.
|
|
|
|
### 7. Modularity
|
|
Clear separation between workflows, methods, services, interfaces, and connectors.
|
|
|
|
### 8. Type Safety
|
|
Pydantic models ensure type safety and validation across the system.
|
|
|
|
---
|
|
|
|
## Development Guidelines
|
|
|
|
### Adding a New Method
|
|
|
|
1. Create `methodX.py` in `modules/workflows/methods/`
|
|
2. Inherit from `MethodBase`
|
|
3. Implement `@action` decorated methods
|
|
4. Follow parameter principles
|
|
5. Delegate to services layer
|
|
|
|
```python
|
|
class MethodX(MethodBase):
|
|
def __init__(self, services):
|
|
super().__init__(services)
|
|
self.name = "x"
|
|
self.description = "X operations"
|
|
|
|
@action
|
|
async def operation(self, parameters):
|
|
# Extract parameters
|
|
# Call services layer
|
|
# Return ActionResult
|
|
pass
|
|
```
|
|
|
|
### Adding a New Service
|
|
|
|
1. Create `serviceX/` directory
|
|
2. Implement `mainServiceX.py` with public API
|
|
3. Add sub-modules as needed
|
|
4. Register in `services/__init__.py`
|
|
|
|
```python
|
|
# In services/__init__.py
|
|
from .serviceX.mainServiceX import ServiceX
|
|
self.x = PublicService(ServiceX(self))
|
|
```
|
|
|
|
### Adding a New Connector
|
|
|
|
1. Create `connectorInterfaceXProvider.py` in `connectors/`
|
|
2. Implement interface contract
|
|
3. Ensure standardization
|
|
4. Handle errors gracefully
|
|
5. Register in interface
|
|
|
|
### Adding a New Extractor/Renderer
|
|
|
|
1. Create `extractorX.py` or `rendererX.py`
|
|
2. Register in `ExtractorRegistry` or `RendererRegistry`
|
|
3. Implement interface:
|
|
- `extract(documentBytes, fileName, mimeType) -> ContentExtracted`
|
|
- or `render(contentData, options) -> bytes`
|
|
|
|
---
|
|
|
|
## Error Handling
|
|
|
|
### Workflow Errors
|
|
- Caught at workflow processor level
|
|
- Workflow status set to "failed"
|
|
- Error logged and returned to user
|
|
|
|
### Action Errors
|
|
- Caught at method level
|
|
- ActionResult returned with `success=False`
|
|
- Error message in `error` field
|
|
|
|
### Service Errors
|
|
- Caught at service level
|
|
- Logged with context
|
|
- Re-raised or handled gracefully
|
|
|
|
### Connector Errors
|
|
- Caught at connector level
|
|
- Retry with next connector (if applicable)
|
|
- Error logged and escalated
|
|
|
|
---
|
|
|
|
## Logging
|
|
|
|
### Log Levels
|
|
- `INFO`: Normal operation flow
|
|
- `DEBUG`: Detailed debugging information
|
|
- `WARNING`: Non-critical issues
|
|
- `ERROR`: Critical errors
|
|
|
|
### Log Structure
|
|
```
|
|
timestamp | level | module | message | context
|
|
```
|
|
|
|
### Workflow Logs
|
|
```python
|
|
self.services.workflow.storeLog(workflow, {
|
|
"message": "Task completed",
|
|
"type": "info",
|
|
"status": "success",
|
|
"progress": 100
|
|
})
|
|
```
|
|
|
|
---
|
|
|
|
## Testing
|
|
|
|
### Unit Tests
|
|
Test individual components in isolation.
|
|
|
|
### Integration Tests
|
|
Test interaction between components.
|
|
|
|
### Workflow Tests
|
|
Test complete workflow execution.
|
|
|
|
### Mock Services
|
|
Use mock services for testing without external dependencies.
|
|
|
|
---
|
|
|
|
## Performance Considerations
|
|
|
|
### Async/Await
|
|
All I/O operations are async for optimal performance.
|
|
|
|
### Lazy Initialization
|
|
Services initialized lazily to reduce startup time.
|
|
|
|
### Caching
|
|
Model registry caches connector instances.
|
|
|
|
### Chunking
|
|
Large documents chunked for efficient processing.
|
|
|
|
### Failover
|
|
Automatic failover ensures reliability without sacrificing performance.
|
|
|
|
---
|
|
|
|
## Security
|
|
|
|
### Access Control
|
|
User permission checks at interface level.
|
|
|
|
### Data Validation
|
|
Pydantic models validate all inputs.
|
|
|
|
### Sanitization
|
|
Prompt content sanitized to prevent injection.
|
|
|
|
### Encryption
|
|
Sensitive data encrypted at rest and in transit.
|
|
|
|
---
|
|
|
|
## Conclusion
|
|
|
|
The PowerOn workflow system is a sophisticated, multi-layered architecture that provides:
|
|
|
|
- **Intelligent Task Automation**: AI-powered task and action planning
|
|
- **Document Processing**: Plug & play extractors and renderers for any format
|
|
- **Service Catalog**: Comprehensive business feature catalog
|
|
- **Dynamic Model Selection**: Optimal AI model selection based on requirements
|
|
- **Standardized Interfaces**: Consistent APIs across all components
|
|
- **Extensibility**: Easy to add new methods, services, connectors
|
|
|
|
The system emphasizes:
|
|
- Separation of concerns
|
|
- Standardization
|
|
- Modularity
|
|
- Type safety
|
|
- Performance
|
|
- Reliability
|