# Workflow System - Developer Documentation ## Overview The PowerOn workflow system is a multi-layered architecture designed for intelligent task automation and document processing. The system processes user requests through a structured pipeline involving workflow orchestration, AI-powered task planning, action execution via specialized methods, and comprehensive service integration. ## System Architecture The workflow system is built on six distinct layers, each serving a specific purpose: ``` ┌─────────────────────────────────────────────────────────────┐ │ Workflow Layer │ │ (workflows/) - Orchestration & Execution Management │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Method Layer │ │ (methods/) - Action Implementation & Parameters │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Service Layer │ │ (services/) - Business Logic & Feature Catalog │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Interface Layer │ │ (interfaces/) - Standardized Component APIs │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Connector Layer │ │ (connectors/) - External System Integration │ └─────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────┐ │ Datamodels & AI Core │ │ (datamodels/, aicore/) - Data Models & AI Orchestration │ └─────────────────────────────────────────────────────────────┘ ``` --- ## Layer 1: Workflow System (@workflows/) ### Purpose The workflow layer is the top-level orchestration system that manages task planning, execution, and state management. It coordinates between different workflow modes (React, Actionplan, etc.) and handles the complete workflow lifecycle. ### Key Components #### workflowManager.py Entry point for workflow operations. **Key Functions:** - `workflowStart()`: Initialize and start a workflow instance - `workflowStop()`: Stop an active workflow - `_workflowProcess()`: Main workflow execution loop - `_planTasks()`: Generate task plans using AI - `_executeTasks()`: Execute tasks sequentially **Workflow States:** - `running`: Workflow is actively processing - `stopped`: User manually stopped the workflow - `completed`: Workflow finished successfully - `failed`: Workflow encountered an error **Example:** ```python # Start a new workflow workflow = await workflowManager.workflowStart( userInput=UserInputRequest(prompt="Analyze sales data"), workflowMode="React" # or "Actionplan" ) ``` #### Workflow Types 1. **React Mode**: Iterative task execution with up to 5 refinement steps - Best for exploratory tasks that may require multiple passes - Uses `maxSteps: 5` configuration 2. **Actionplan Mode**: Single-pass task execution with full action planning upfront - Best for well-defined tasks with clear objectives - Single execution path **Workflow Lifecycle:** ``` start → analyzeUserInput → planTasks → executeTasks → generateResults → complete ``` #### Processing Layer (processing/) The processing layer handles mode-specific execution logic: - `workflowProcessor.py`: Main processor delegating to mode implementations - `modes/modeBase.py`: Base class for all workflow modes - `modes/modeReact.py`: React mode implementation - `modes/modeActionplan.py`: Actionplan mode implementation **Task Execution Flow:** ```python # Task planning taskPlan = await processor.generateTaskPlan(userInput, workflow) # Task execution for task in taskPlan.tasks: result = await processor.executeTask(task, workflow, context) # Process results and handover ``` ### Task and Action Planning The system operates on a two-level planning hierarchy: 1. **Task Planning**: High-level objective breakdown - Each task represents a major objective - Tasks are planned upfront (Actionplan) or iteratively (React) 2. **Action Planning**: Low-level operation sequencing - Actions are specific operations to accomplish a task - Actions are executed via method implementations **TaskContext Structure:** ```python TaskContext( task_step: TaskStep, workflow: ChatWorkflow, workflow_id: str, previous_results: List[str], improvements: List[str], retry_count: int, # ... other context fields ) ``` ### Imports Constraint Workflows layer imports ONLY from: - `modules.datamodels.*` - Data models - `modules.services.*` - Service layer - `modules.workflows.*` - Internal workflow components **NO direct imports from:** - Interfaces - Connectors - AI core (use via services.ai) --- ## Layer 2: Methods (@methods/) ### Purpose Methods implement executable actions. They define the interface between workflow planning and service execution, providing a standard way to perform operations with parameter handling. ### Method Structure All methods inherit from `MethodBase`: ```python class MethodBase: def __init__(self, services): self.services = services self.name = "method_name" self.description = "Method description" @property def actions(self) -> Dict[str, Dict]: # Auto-discovers @action decorated methods pass ``` ### Action Decorator Methods expose executable actions using the `@action` decorator: ```python class MethodAi(MethodBase): @action async def process(self, parameters: Dict[str, Any]) -> ActionResult: """ Process a user prompt with AI. Parameters: - aiPrompt (str, required): Instruction for the AI - documentList (list, optional): Document references - resultType (str, optional): Output format (txt, json, etc.) """ aiPrompt = parameters.get("aiPrompt") documentList = parameters.get("documentList", []) resultType = parameters.get("resultType", "txt") # Call service layer result = await self.services.ai.callAiDocuments(...) # Return ActionResult return ActionResult.isSuccess(documents=[...]) ``` ### Parameter Principles 1. **Type Safety**: Parameters are typed and validated 2. **Required vs Optional**: Documented in docstrings 3. **Default Values**: Optional parameters have defaults 4. **Service Delegation**: Methods delegate to services layer ### What Belongs in Methods vs Services **Methods handle:** - Parameter extraction and validation - Action-specific orchestration - Document packaging/unpackaging - Error handling for action execution **Services handle:** - Core business logic - Standardized operations - Data transformation - Cross-cutting concerns (logging, stats) ### Example Method Implementation ```python class MethodOutlook(MethodBase): def __init__(self, services): super().__init__(services) self.name = "outlook" self.description = "Microsoft Outlook operations" @action async def readMails(self, parameters: Dict[str, Any]) -> ActionResult: """ Read emails from Outlook mailbox. Parameters: - mailboxName (str, required): Mailbox to read from - filterCriteria (dict, optional): Email filter criteria """ # Extract parameters mailboxName = parameters.get("mailboxName") filterCriteria = parameters.get("filterCriteria", {}) # Delegate to service layer emails = await self.services.outlook.readEmails( mailbox=mailboxName, filter=filterCriteria ) # Package results as documents documents = [ActionDocument( documentName=f"email_{i}.json", documentData=email.to_dict(), mimeType="application/json" ) for i, email in enumerate(emails)] return ActionResult.isSuccess(documents=documents) ``` ### Available Methods 1. **methodAi**: AI processing and generation - `process`: General AI processing with documents - `webResearch`: Web research and crawling - `generateImage`: AI image generation 2. **methodOutlook**: Microsoft Outlook operations - `readMails`: Read emails - `sendMail`: Send emails - `manageCalendar`: Calendar operations 3. **methodSharepoint**: SharePoint operations - `readFiles`: Read files - `writeFiles`: Write files - `manageFolders`: Folder management ### Imports Constraint Methods layer imports ONLY from: - `modules.datamodels.*` - Data models - `modules.services.*` - Service layer - `modules.workflows.methods.*` - Base classes **NO direct imports from:** - Interfaces - Connectors - AI core - Workflow orchestration --- ## Layer 3: Services (@services/) ### Purpose The services layer provides a comprehensive catalog of business features. All services follow a consistent structure and are accessible via `self.services.xxx`. ### Services Architecture Each service is structured as: ``` serviceX/ - mainServiceX.py # Public API - subCoreX.py # Core business logic - subDocumentX.py # Document-specific logic - subSharedX.py # Shared utilities ``` ### Service Catalog #### 1. serviceAi - AI Operations **Purpose**: AI calls, model selection, document processing **Key Functions:** ```python # AI document processing await services.ai.callAiDocuments( prompt="Generate report", documents=[...], options=AiCallOptions(...) ) # AI planning calls await services.ai.callAiPlanning( prompt="Plan tasks", placeholders=None ) # Image operations await services.ai.readImage(prompt, imageData, mimeType) await services.ai.generateImage(prompt, size, quality, style) ``` **Sub-modules:** - `subCoreAi`: Core AI operations - `subDocumentProcessing`: Document chunking and processing - `subDocumentGeneration`: Single/multi-file generation - `subSharedAiUtils`: Shared utilities #### 2. serviceWorkflow - Workflow Management **Purpose**: Workflow state, progress tracking, message handling **Key Functions:** ```python # Progress tracking services.workflow.progressLogStart(operationId, stage, step, details) services.workflow.progressLogUpdate(operationId, progress, message) services.workflow.progressLogFinish(operationId, success) # Message handling services.workflow.storeMessageWithDocuments(workflow, messageData, documents) services.workflow.getChatDocumentsFromDocumentList(documentList) # Workflow state services.workflow.storeLog(workflow, logEntry) services.workflow.storeWorkflowStat(workflow, aiResponse, process) ``` #### 3. serviceExtraction - Document Extraction **Purpose**: Extracting content from various document formats **Features:** - Multiple extractors (PDF, DOCX, XLSX, images, etc.) - Chunking for large documents - Merging strategies for AI processing **Key Functions:** ```python # Extract content extracted = services.extraction.extractContent( documents=[...], options=ExtractionOptions(...) ) # Merge AI results merged = services.extraction.mergeAiResults( extractedContent=[...], aiResults=[...], strategy=MergeStrategy(...) ) ``` **Plug & Play Extractors:** - `extractorPdf.py`: PDF extraction - `extractorDocx.py`: DOCX extraction - `extractorImage.py`: Image OCR - `extractorXlsx.py`: Excel extraction - `extractorCsv.py`: CSV parsing - And more... **Chunkers:** - `chunkerText.py`: Text chunking - `chunkerTable.py`: Table chunking - `chunkerImage.py`: Image chunking - `chunkerStructure.py`: Structural chunking **Mergers:** - `mergerText.py`: Text merging - `mergerTable.py`: Table merging - `mergerDefault.py`: Default merging #### 4. serviceGeneration - Document Generation **Purpose**: Generating documents in various formats **Features:** - Multiple renderers (JSON, HTML, PDF, DOCX, etc.) - Template support - Schema validation **Key Functions:** ```python # Process action results into documents documents = services.generation.processActionResultDocuments( action_result, action, workflow ) # Create documents from action results created = services.generation.createDocumentsFromActionResult( action_result, action, workflow, message_id ) ``` **Plug & Play Renderers:** - `rendererJson.py`: JSON rendering - `rendererHtml.py`: HTML rendering - `rendererPdf.py`: PDF generation - `rendererDocx.py`: DOCX generation - `rendererText.py`: Plain text - And more... #### 5. serviceWeb - Web Operations **Purpose**: Web research, crawling, content extraction **Key Functions:** ```python # Perform web research research = await services.web.performWebResearch( prompt="Find information", urls=[...], country="us", language="en", researchDepth="general" ) ``` #### 6. serviceTicket - Ticket System Integration **Purpose**: Integration with ticket systems (Jira, ClickUp, etc.) #### 7. serviceSharepoint - SharePoint Operations **Purpose**: SharePoint file and folder management #### 8. serviceNeutralization - Data Normalization **Purpose**: Normalizing data from different sources #### 9. serviceNormalization - Additional Normalization **Purpose**: Extended normalization features #### 10. serviceUtils - Utility Functions **Purpose**: Timestamps, formatting, validation **Key Functions:** ```python # Get UTC timestamp timestamp = services.utils.timestampGetUtc() # Other utility functions ... ``` ### Services Access Pattern All services are accessed via `self.services.xxx`: ```python # In a method class MyMethod(MethodBase): @action async def myAction(self, parameters): # Access any service result = await self.services.ai.callAiDocuments(...) data = self.services.workflow.getFileInfo(fileId) timestamp = self.services.utils.timestampGetUtc() return ActionResult.isSuccess(...) ``` ### Plug & Play Document Processing The system supports plug & play extractors and renderers for document transformation: **Extraction Flow:** ``` Source Document (PDF, DOCX, etc.) → Extractor (registered by MIME type) → Chunker (if document too large) → ContentExtracted (standard structure) ``` **Generation Flow:** ``` ContentExtracted (standard structure) → AI Processing → Merger (combine AI results) → Renderer (convert to target format) → Destination Document (JSON, HTML, PDF, etc.) ``` **Internal Data Structure:** ```python ContentExtracted: - id: str - parts: List[ContentPart] - label: str - data: str - metadata: ContentMetadata - size, pages, mimeType, etc. # Standard JSON structure for transformation { "documentId": "...", "parts": [ { "label": "text_content", "data": "...", "metadata": {...} } ] } ``` **Transformation Matrix:** ``` Any Source Format (PDF, DOCX, XLSX, images, etc.) ↓ Standard JSON (ContentExtracted) ↓ Any Destination Format (JSON, HTML, PDF, DOCX, etc.) ``` This enables transforming ANY document format to ANY other format through the standardized JSON intermediate structure. ### Imports Constraint Services layer imports ONLY from: - `modules.datamodels.*` - Data models - `modules.interfaces.*` - Interface layer - `modules.aicore.*` - AI core (for AI services) - `modules.services.*` - Other services **NO direct imports from:** - Connectors (use via interfaces) - Workflow orchestration - Methods --- ## Layer 4: Interfaces (@interfaces/) ### Purpose Interfaces provide standardized APIs for accessing different data sources and components. They abstract away implementation details and enable pluggable connector systems. ### Interface Structure Each interface defines: 1. **Standardized Methods**: Common operations across all implementations 2. **Data Models**: Interface-specific data models 3. **Access Control**: User permission management ### Available Interfaces #### 1. interfaceDbChatAccess & interfaceDbChatObjects **Purpose**: Chat system database operations **Key Classes:** - `ChatWorkflow`: Workflow instances - `ChatMessage`: Messages in a workflow - `ChatDocument`: Documents attached to messages - `ChatLog`: Workflow logs - `ChatStat`: Workflow statistics **Key Functions:** ```python # Workflow operations workflow = interface.createWorkflow(workflowData) workflow = interface.getWorkflow(workflowId) interface.updateWorkflow(workflowId, updateData) # Message operations message = interface.createMessage(messageData) messages = interface.getMessages(workflowId) # Document operations document = interface.createDocument(documentData) documents = interface.getDocuments(messageId) ``` #### 2. interfaceDbAppAccess & interfaceDbAppObjects **Purpose**: Application database operations **Key Classes:** - `UserInDB`: User accounts - `VoiceSettings`: Voice configuration - `Connection`: External connections #### 3. interfaceDbComponentAccess & interfaceDbComponentObjects **Purpose**: Component database operations (files, data storage) **Key Classes:** - `FileItem`: File metadata - `FileData`: File content **Key Functions:** ```python # File operations fileItem = interface.createFile(name, mimeType, content) fileData = interface.getFileData(fileId) interface.updateFileData(fileId, content) ``` #### 4. interfaceAiObjects **Purpose**: AI model management and calls **Key Functions:** ```python # AI calls response = await interface.call(request) # Model selection model = interface.getModelsByOperationType(operationType) ``` #### 5. interfaceTicketObjects **Purpose**: Ticket system interface **Key Classes:** - `TicketBase`: Base ticket interface - `TicketFieldAttribute`: Ticket field definitions ### Interface Pattern Interfaces use a factory pattern: ```python def getInterface(user: User, workflow: ChatWorkflow = None) -> InterfaceType: """Factory function that returns appropriate implementation.""" return InterfaceImplementation(user, workflow) ``` ### Connector Integration Interfaces can have multiple connector implementations: ``` interfaceDbChat ├─ connectorDbPostgre.py ├─ connectorDbJson.py └─ (future connectors) ``` Each connector: - Implements interface requirements - Handles data source specifics - Returns standardized data models - Ensures interface compatibility ### Imports Constraint Interfaces layer imports: - `modules.datamodels.*` - Data models - Database/connector specific libraries **NO direct imports from:** - Services - Workflows - Methods --- ## Layer 5: Connectors (@connectors/) ### Purpose Connectors integrate external systems and services. They implement interface contracts and provide standardized access to diverse data sources. ### Naming Convention Connector naming follows the pattern: ``` connector{InterfaceType}{Provider}.py ``` Examples: - `connectorDbPostgre.py` - PostgreSQL database connector - `connectorDbJson.py` - JSON file connector - `connectorTicketsJira.py` - Jira ticket connector - `connectorTicketsClickup.py` - ClickUp connector - `connectorVoiceGoogle.py` - Google Voice connector ### Connector Structure Every connector must: 1. **Implement Interface Contract**: ```python class ConnectorTicketsJira(TicketBase): def __init__(self, *, apiUsername, apiToken, apiUrl, ...): # Initialize connector pass async def read_tasks(self, *, limit: int = 0) -> list[dict]: # Implement interface method pass async def create_task(self, taskData: dict) -> dict: # Implement interface method pass ``` 2. **Ensure Standardization**: - Return standardized data models - Handle authentication consistently - Provide error handling - Log operations 3. **Respect Interface Requirements**: - Must implement all required methods - Must return data in expected format - Must handle errors gracefully ### Connector Examples #### Database Connectors ```python # PostgreSQL Connector class ConnectorDbPostgre: async def executeQuery(self, query: str, params: dict): # Execute against PostgreSQL pass ``` ```python # JSON File Connector class ConnectorDbJson: async def executeQuery(self, query: str, params: dict): # Execute against JSON files pass ``` #### Ticket System Connectors ```python # Jira Connector class ConnectorTicketsJira(TicketBase): async def read_attributes(self) -> List[TicketFieldAttribute]: # Read Jira field attributes pass async def read_tasks(self, *, limit: int = 0) -> List[dict]: # Read Jira tasks pass ``` ```python # ClickUp Connector class ConnectorTicketsClickup(TicketBase): async def read_attributes(self) -> List[TicketFieldAttribute]: # Read ClickUp field attributes pass async def read_tasks(self, *, limit: int = 0) -> List[dict]: # Read ClickUp tasks pass ``` ### Connector Registration Connectors are registered dynamically: ```python # In interface connectors = [ ConnectorTicketsJira(credentials), ConnectorTicketsClickup(credentials) ] ``` ### Imports Constraint Connectors layer imports: - `modules.datamodels.*` - Data models - `modules.interfaces.*` - Interface contracts - External libraries (aiohttp, etc.) **NO direct imports from:** - Services - Workflows - Methods --- ## Layer 6: Data Models (@datamodels/) ### Purpose Data models define the core data structures used throughout the system. They use Pydantic for validation and type safety. ### Key Data Models #### Chat Models (datamodelChat.py) ```python ChatWorkflow: - id: str - name: str - status: str - workflowMode: str # "React" or "Actionplan" - currentRound: int - currentTask: int - currentAction: int - totalTasks: int - totalActions: int - messages: List[ChatMessage] - logs: List[ChatLog] - stats: List[ChatStat] ChatMessage: - id: str - workflowId: str - role: str # "user" or "assistant" - message: str - documents: List[ChatDocument] - documentsLabel: str - roundNumber: int - taskNumber: int - actionNumber: int - taskProgress: str - actionProgress: str TaskContext: - task_step: TaskStep - workflow: ChatWorkflow - workflow_id: str - previous_results: List[str] - available_documents: List[ChatDocument] - improvements: List[str] - retry_count: int TaskStep: - objective: str - description: str - criteria: List[str] TaskPlan: - tasks: List[TaskStep] - summary: str ActionResult: - success: bool - documents: List[ActionDocument] - error: str (optional) ``` #### AI Models (datamodelAi.py) ```python AiModel: - name: str - displayName: str - connectorType: str - contextLength: int - qualityRating: int # 1-10 - speedRating: int # 1-10 - costPer1kTokensInput: float - costPer1kTokensOutput: float - operationTypes: List[OperationTypeRating] AiCallOptions: - operationType: OperationTypeEnum - resultFormat: str - processingMode: ProcessingModeEnum - priority: PriorityEnum - modelName: str (optional) AiCallRequest: - prompt: str - context: str (optional) - options: AiCallOptions - placeholders: List[PromptPlaceholder] (optional) AiCallResponse: - content: str - modelName: str - priceCHF: float - processingTime: float - bytesSent: int - bytesReceived: int - errorCount: int ``` #### Extraction Models (datamodelExtraction.py) ```python ContentPart: - label: str - data: str - metadata: ContentMetadata - typeGroup: str ContentExtracted: - id: str - parts: List[ContentPart] ContentMetadata: - size: int - mimeType: str - pages: int (optional) - error: str (optional) MergeStrategy: - mergeType: str - config: dict ``` ### Data Model Usage Data models are imported across all layers: ```python # In any layer from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage from modules.datamodels.datamodelAi import AiCallOptions ``` --- ## AI Core System (@aicore/) ### Purpose The AI core provides dynamic model selection based on KPIs, business requirements, and operation types. It harmonizes model parameters for optimal performance. ### Key Components #### aicoreModelRegistry.py **Purpose**: Centralized model registry for all AI connectors **Key Functions:** ```python # Discover and register connectors connectors = modelRegistry.discoverConnectors() modelRegistry.registerConnector(connector) # Get available models availableModels = modelRegistry.getAvailableModels() # Get models by operation type models = modelRegistry.getModelsByOperationType(operationType) # Get specific model model = modelRegistry.getModel(modelName) ``` #### aicoreModelSelector.py **Purpose**: Intelligent model selection based on requirements **Selection Criteria:** 1. **Operation Type**: Primary filter (must match) 2. **Context Length**: Must fit within model limits 3. **Prompt Size**: Checks if prompt fits 4. **Processing Mode**: Basic/Advanced/Detailed 5. **Priority**: Speed/Quality/Cost/Balanced **Selection Algorithm:** ```python def selectModel(prompt, context, options, availableModels): # Step 1: Filter by operation type operationFiltered = filterByOperationType(availableModels, options) # Step 2: Filter by prompt size promptFiltered = filterByPromptSize(operationFiltered, prompt) # Step 3: Calculate scores scoredModels = calculateScores(promptFiltered, prompt, context, options) # Step 4: Sort by score sortedModels = sortByScore(scoredModels) return sortedModels[0] # Best model ``` **Scoring Formula:** ``` score = ( operationTypeRating * 1000.0 + # Primary sizeRating + # Context fit processingModeRating + # Mode compatibility priorityRating # Speed/Quality/Cost ) ``` #### aicorePlugin*.py AI connectors for different providers: - **aicorePluginOpenai**: OpenAI models (GPT-4, etc.) - **aicorePluginAnthropic**: Claude models - **aicorePluginPerplexity**: Perplexity AI - **aicorePluginTavily**: Tavily search - **aicorePluginInternal**: Internal models **Connector Interface:** ```python class BaseConnectorAi(ABC): def getModels(self) -> List[AiModel]: """Return all available models from this connector.""" pass def getConnectorType(self) -> str: """Return connector type identifier.""" pass ``` ### Dynamic Model Selection **Selection Process:** ``` 1. Analyze request (prompt, context, operation type) 2. Get all available models from registry 3. Filter by operation type (MUST match) 4. Filter by size constraints 5. Score each model 6. Select best model 7. Execute with failover to backup models ``` **Failover Mechanism:** ```python # Try models in priority order for model in failoverModels: try: result = await callModel(model, prompt, context) return result except Exception: # Try next model continue # If all models fail raise Exception("All models failed") ``` ### Operation Types Models are rated for different operation types: - `GENERATE`: Text generation - `ANALYZE`: Text analysis - `EXTRACT`: Information extraction - `PLAN`: Task planning - `IMAGE_GENERATE`: Image generation - `IMAGE_ANALYZE`: Image analysis **Rating System:** - Each model has ratings per operation type (1-10) - Higher rating = better for that operation - Rating * 1000 is primary sorting criteria ### Processing Modes - `BASIC`: Simple operations, fast processing - `ADVANCED`: Moderate complexity - `DETAILED`: High complexity, thorough analysis **Compatibility:** - Models can handle their mode or downgrade - Mode compatibility affects scoring ### Priority Handling - `BALANCED`: Default, considers all factors - `SPEED`: Optimize for speed - `QUALITY`: Optimize for quality - `COST`: Optimize for cost **Priority Scoring:** ```python if priority == "SPEED": score += model.speedRating / 10.0 elif priority == "QUALITY": score += model.qualityRating / 10.0 elif priority == "COST": score += costOptimization(model) ``` ### Harmonized Parameters The system harmonizes parameters across different model APIs: - Temperature/sampling parameters - Max tokens settings - Context length handling - Response format handling ### Usage Example ```python # In service layer from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicoreModelSelector import modelSelector # Get available models availableModels = modelRegistry.getAvailableModels() # Create options options = AiCallOptions( operationType=OperationTypeEnum.GENERATE, resultFormat="json", processingMode=ProcessingModeEnum.DETAILED, priority=PriorityEnum.QUALITY ) # Select best model selectedModel = modelSelector.selectModel( prompt=prompt, context=context, options=options, availableModels=availableModels ) # Execute with failover response = await executeWithFailover( models=modelSelector.getFailoverModelList(...), prompt=prompt, context=context, options=options ) ``` --- ## Data Flow ### Complete Request Flow ``` User Input ↓ WorkflowManager.workflowStart() ↓ WorkflowProcessor.generateTaskPlan() ↓ AI Planning Call (services.ai.callAiPlanning) ↓ Task Plan Created ↓ For each Task: ActionPlan Generated ↓ For each Action: Method Action Executed ↓ Service Layer Called ↓ Interface Layer Called ↓ Connector Executes ↓ Results Returned ↓ Results Aggregated ↓ Final Response Generated ``` ### Document Processing Flow ``` Source Document (PDF, DOCX, etc.) ↓ services.extraction.extractContent() ↓ Extractor (extractorPdf, extractorDocx, etc.) ↓ Chunker (if document too large) ↓ ContentExtracted (standard JSON structure) ↓ AI Processing (services.ai.callAiDocuments) ↓ Merger (merge AI results) ↓ Renderer (rendererHtml, rendererPdf, etc.) ↓ Destination Document (HTML, PDF, etc.) ``` --- ## Design Principles ### 1. Layered Architecture Each layer has specific responsibilities and imports only from layers below it. ### 2. Standardization Standardized data models, interfaces, and method signatures enable plug & play components. ### 3. Dynamic Selection AI models selected dynamically based on requirements, not hardcoded. ### 4. Failover Mechanism Automatic failover to backup models ensures reliability. ### 5. Progress Tracking Complete progress tracking throughout workflow execution. ### 6. Document Transformation Standardized JSON intermediate format enables any-to-any document transformation. ### 7. Modularity Clear separation between workflows, methods, services, interfaces, and connectors. ### 8. Type Safety Pydantic models ensure type safety and validation across the system. --- ## Development Guidelines ### Adding a New Method 1. Create `methodX.py` in `modules/workflows/methods/` 2. Inherit from `MethodBase` 3. Implement `@action` decorated methods 4. Follow parameter principles 5. Delegate to services layer ```python class MethodX(MethodBase): def __init__(self, services): super().__init__(services) self.name = "x" self.description = "X operations" @action async def operation(self, parameters): # Extract parameters # Call services layer # Return ActionResult pass ``` ### Adding a New Service 1. Create `serviceX/` directory 2. Implement `mainServiceX.py` with public API 3. Add sub-modules as needed 4. Register in `services/__init__.py` ```python # In services/__init__.py from .serviceX.mainServiceX import ServiceX self.x = PublicService(ServiceX(self)) ``` ### Adding a New Connector 1. Create `connectorInterfaceXProvider.py` in `connectors/` 2. Implement interface contract 3. Ensure standardization 4. Handle errors gracefully 5. Register in interface ### Adding a New Extractor/Renderer 1. Create `extractorX.py` or `rendererX.py` 2. Register in `ExtractorRegistry` or `RendererRegistry` 3. Implement interface: - `extract(documentBytes, fileName, mimeType) -> ContentExtracted` - or `render(contentData, options) -> bytes` --- ## Error Handling ### Workflow Errors - Caught at workflow processor level - Workflow status set to "failed" - Error logged and returned to user ### Action Errors - Caught at method level - ActionResult returned with `success=False` - Error message in `error` field ### Service Errors - Caught at service level - Logged with context - Re-raised or handled gracefully ### Connector Errors - Caught at connector level - Retry with next connector (if applicable) - Error logged and escalated --- ## Logging ### Log Levels - `INFO`: Normal operation flow - `DEBUG`: Detailed debugging information - `WARNING`: Non-critical issues - `ERROR`: Critical errors ### Log Structure ``` timestamp | level | module | message | context ``` ### Workflow Logs ```python self.services.workflow.storeLog(workflow, { "message": "Task completed", "type": "info", "status": "success", "progress": 100 }) ``` --- ## Testing ### Unit Tests Test individual components in isolation. ### Integration Tests Test interaction between components. ### Workflow Tests Test complete workflow execution. ### Mock Services Use mock services for testing without external dependencies. --- ## Performance Considerations ### Async/Await All I/O operations are async for optimal performance. ### Lazy Initialization Services initialized lazily to reduce startup time. ### Caching Model registry caches connector instances. ### Chunking Large documents chunked for efficient processing. ### Failover Automatic failover ensures reliability without sacrificing performance. --- ## Security ### Access Control User permission checks at interface level. ### Data Validation Pydantic models validate all inputs. ### Sanitization Prompt content sanitized to prevent injection. ### Encryption Sensitive data encrypted at rest and in transit. --- ## Conclusion The PowerOn workflow system is a sophisticated, multi-layered architecture that provides: - **Intelligent Task Automation**: AI-powered task and action planning - **Document Processing**: Plug & play extractors and renderers for any format - **Service Catalog**: Comprehensive business feature catalog - **Dynamic Model Selection**: Optimal AI model selection based on requirements - **Standardized Interfaces**: Consistent APIs across all components - **Extensibility**: Easy to add new methods, services, connectors The system emphasizes: - Separation of concerns - Standardization - Modularity - Type safety - Performance - Reliability