85 KiB
Gateway Development Framework: Connectors → Interfaces → Services → Workflows
This document explains the gateway's code logic and development framework to build market customer journey features. It focuses on how connectors, interfaces, services, and workflows compose a standardized services landscape that can be consumed by routes, features, and agent models to perform tasks and actions.
Purpose
-
Unify external tools: Combine many third‑party APIs and utilities behind a consistent interface.
-
Standardize service design: Model capabilities as reusable services with clear contracts.
-
Enable workflow automation: Let agent models orchestrate multi‑step tasks using the centralized services.
-
Abstract complexity: Hide implementation details behind clean, well-defined APIs.
-
Enforce security and governance: Apply consistent access control, audit trails, and data isolation across all layers.
High‑Level Architecture
The Gateway follows a layered architecture pattern with clear separation of concerns:
-
Connectors: Vendor-specific adapters for external systems (databases, APIs, cloud services) handling auth, transport, retries, and basic mapping.
-
Interfaces: Normalization layer exposing common contracts independent of any single vendor. Provides CRUD operations, access control, and data transformation.
-
Services: Business‑level capabilities built on interfaces, composed into feature‑ready functions. Orchestrate multiple interfaces and apply business rules.
-
Service Center: Central registry/factory (
Servicesclass) that instantiates and exposes services with consistent configuration, user context, and lifecycle management. -
Workflows & Methods: Orchestration engine that calls services to perform tasks/actions. Methods provide extensible, plugin-like actions that workflows can invoke.
Data/control flow: Client or Workflow → Service Center → Service → Interface → Connector → External Tool/Database
Directory Overview (gateway)
gateway/
├── modules/
│ ├── connectors/ # Vendor-specific adapters
│ │ ├── connectorDbPostgre.py # PostgreSQL database
│ │ ├── connectorDbJson.py # JSON file-based database
│ │ ├── connectorVoiceGoogle.py # Google Cloud Speech services
│ │ ├── connectorTicketsJira.py # JIRA integration
│ │ └── connectorTicketsClickup.py # ClickUp integration
│ │
│ ├── datamodels/ # Pydantic models defining data structures
│ │ ├── datamodelRealEstate.py
│ │ ├── datamodelChat.py
│ │ ├── datamodelAi.py
│ │ ├── datamodelUam.py # User & Mandate models
│ │ └── ...
│ │
│ ├── interfaces/ # Data access layer
│ │ ├── interfaceDbRealEstateObjects.py # CRUD operations
│ │ ├── interfaceDbRealEstateAccess.py # Access control
│ │ ├── interfaceDbChatObjects.py
│ │ ├── interfaceDbChatAccess.py
│ │ ├── interfaceDbAppObjects.py
│ │ ├── interfaceDbComponentObjects.py
│ │ ├── interfaceAiObjects.py # AI operations
│ │ ├── interfaceTicketObjects.py # Ticket systems
│ │ └── interfaceVoiceObjects.py # Voice operations
│ │
│ ├── services/ # Business-level capabilities
│ │ ├── __init__.py # Services container (Service Center)
│ │ ├── serviceAi/ # AI operations
│ │ ├── serviceChat/ # Workflow & document management
│ │ ├── serviceExtraction/ # Content extraction
│ │ ├── serviceGeneration/ # Document generation
│ │ ├── serviceNeutralization/ # Data anonymization
│ │ ├── serviceSharepoint/ # SharePoint integration
│ │ ├── serviceTicket/ # Ticket system integration
│ │ └── serviceUtils/ # Common utilities
│ │
│ ├── workflows/ # Orchestration engine
│ │ ├── workflowManager.py # Main orchestration controller
│ │ ├── processing/ # Processing logic
│ │ │ ├── workflowProcessor.py
│ │ │ ├── core/ # Core components
│ │ │ ├── modes/ # Execution modes
│ │ │ └── shared/ # Shared utilities
│ │ └── methods/ # Extensible action methods
│ │ ├── methodBase.py
│ │ ├── methodAi.py
│ │ └── ...
│ │
│ ├── routes/ # HTTP endpoints exposing capabilities
│ │ ├── routeChatPlayground.py
│ │ ├── routeWorkflows.py
│ │ └── ...
│ │
│ ├── features/ # Domain-specific business logic
│ │ └── mainChatPlayground.py
│ │
│ ├── security/ # Authentication, authorization, token management
│ │ ├── auth.py
│ │ ├── jwtService.py
│ │ ├── tokenManager.py
│ │ └── ...
│ │
│ └── shared/ # Cross-cutting utilities
│ ├── config.py
│ ├── logging.py
│ └── ...
1) Connectors: Many External Tools, One Adapter Shape
Role: Provide the lowest-level integration with external systems (databases, APIs, SDKs, auth, retries).
Responsibility:
- Authentication and credential handling: Manage API keys, OAuth tokens, database credentials
- Transport: HTTP/WebSocket clients, connection pooling, retry logic, circuit breaking
- Response normalization: Map vendor-specific responses to minimal internal shapes
- Error handling: Transform external errors into consistent internal error structures
Output: Vendor‑flavored data mapped to connector models, not directly used by workflows or services.
Key Guidelines:
- Keep connectors vendor‑specific and replaceable (e.g.,
connectorDbPostgre.pyvsconnectorDbJson.py) - No business logic; only integration concerns and basic mapping
- Use duck typing (no formal interfaces) for flexibility
- Handle retries, timeouts, and connection management internally
- Return structured error responses, never raise exceptions to application layer
Example Connector Types:
- Database Connectors: PostgreSQL (
connectorDbPostgre.py), JSON file-based (connectorDbJson.py) - Voice Connectors: Google Cloud Speech (
connectorVoiceGoogle.py) - Ticket Connectors: JIRA (
connectorTicketsJira.py), ClickUp (connectorTicketsClickup.py)
2) Interfaces: Stable Contracts Over Connectors
Role: Define capability‑oriented contracts (e.g., ChatObjects, AppObjects, AiObjects) and map connector outputs into interface DTOs.
Responsibility:
- Normalize differing vendors: Convert vendor-specific data into consistent domain objects
- Hide vendor peculiarities: Abstract away implementation details behind clean, typed DTOs
- Provide CRUD operations: Create, Read, Update, Delete methods for domain entities
- Enforce access control: Apply user privilege checks and mandate-based filtering
- Offer capability toggles: Sensible defaults and configuration options
Output: Clean, stable methods used by services (e.g., getWorkflow(), createMessage(), call()).
Interface Structure:
Interfaces are split into two file types:
- Objects Files (
interface*Objects.py): CRUD operations and business logic - Access Files (
interface*Access.py): Permission checking and data filtering
Key Guidelines:
- Prefer capability names over vendor names (e.g.,
ChatObjectsnotPostgreChatObjects) - Keep interfaces small, cohesive, and testable with mocks
- Always require user context for database interfaces (enables access control)
- Use Pydantic models (datamodels) for type safety
- Apply Unified Access Management (UAM) for all database queries
Example Interface Types:
- Database Interfaces:
interfaceDbChatObjects,interfaceDbAppObjects,interfaceDbRealEstateObjects - External System Interfaces:
interfaceAiObjects,interfaceTicketObjects,interfaceVoiceObjects
3) Services: Business‑Level Capabilities
Role: Compose one or more interfaces to implement feature‑ready operations (e.g., "answer question with web grounding", "extract and analyze documents").
Responsibility:
- Apply business rules: Validation, guardrails, transformations, data enrichment
- Orchestrate multiple interfaces: Coordinate between interfaces and other services
- Emit domain events/metrics: Track operations, costs, performance
- Enforce security policies: Apply additional security checks beyond interface layer
- Handle complex workflows: Multi-step operations with error recovery
Output: High‑level operations that workflows and routes can call atomically.
Service Container Pattern:
All services are initialized through the Services container. Initialize with user context using Services(user=current_user, workflow=current_workflow), then access services via services.ai.callAiDocuments(), services.chat.storeMessageWithDocuments(), etc.
Key Guidelines:
- Services depend on interfaces, not connectors directly
- Keep input/output DTOs explicit and versioned when necessary
- Services can call other services via
self.services - Use
PublicServicewrapper to expose only public methods - Keep services stateless (no session state, use database for persistence)
Core Services:
- AI Service: AI model operations, planning, document processing
- Chat Service: Workflow management, message handling, document resolution
- Extraction Service: Multi-format document extraction and processing
- Generation Service: Document rendering in various formats
- Neutralization Service: Data anonymization for GDPR compliance
- SharePoint Service: SharePoint integration
- Ticket Service: Ticket system integration (Jira, ClickUp)
- Utils Service: Common utilities (config, events, time, debug)
4) Centralized Service Center
Role: A registry/factory (Services class) that instantiates and exposes services with consistent configuration and lifecycle.
Responsibility:
- Discoverability: List/get services by capability key (e.g.,
services.ai,services.chat) - Configuration: Environment, credentials, routing to specific vendors
- Cross‑cutting: User context, workflow context, interface access
- Lifecycle management: Initialize services with proper dependencies
- Access control: Provide user context to all services and interfaces
Usage Pattern:
- Route receives request with authenticated user (via
getCurrentUserdependency) - Create Services container with user context using
Services(user=currentUser) - Call service method with typed input (e.g.,
services.ai.callAiDocuments()) - Receive typed output
Service Center Structure:
The Services class initializes with user and optional workflow context. It initializes interfaces via getChatInterface(), getAppInterface(), getComponentInterface(), and wraps all services in PublicService wrappers (e.g., PublicService(AiService(self))).
Key Features:
- User Context: Every service has access to
self.services.userfor access control - Workflow Context: Services can access
self.services.workflowfor workflow-aware operations - Interface Access: Services access interfaces via
self.services.interfaceDbChat, etc. - Service Composition: Services call other services via
self.services.otherService.method()
5) Workflows & Agent Models
Role: Coordinate tasks and actions by invoking services in sequence/branches/loops.
Responsibility:
- Maintain execution state: Track workflow progress, round/task/action counters
- Choose actions: Use agent models (AI) or predefined plans to determine next steps
- Handle retries/compensation: Retry failed tasks with improvements, rollback on failure
- Record audit logs: Track all workflow steps, decisions, and outcomes
- Manage document flow: Resolve document references, track document lineage
Typical Pattern:
- Ingest user intent/context: Analyze user input, extract documents, detect language
- Plan next action: Use AI to generate task plan or follow predefined JSON plan
- Call services via Service Center: Invoke services to perform operations
- Persist outputs: Store results, update state, decide next step
- Generate feedback: Create completion messages, summarize results
Workflow Modes:
- Actionplan Mode: Batch planning with quality review and intelligent retry
- Dynamic Mode: Iterative, just-in-time action generation
- Automation Mode: Predefined JSON-based deterministic execution
Method System:
Workflows invoke actions through an extensible method system:
- Methods: Plugin-like classes that expose actions via
@actiondecorator - Actions: Async methods that perform specific operations (e.g.,
methodAi.process(),methodSharepoint.search()) - Automatic Discovery: Methods are discovered at runtime via introspection
- Signature Generation: Action signatures are generated for AI prompt generation
Standardized Interface Example (Actual Implementation)
Interfaces like ChatObjects provide methods such as getWorkflow() and createMessage(). The AiObjects interface provides call() for AI model operations. Vendors like OpenAI/Anthropic implement AiObjects through connectors; database connectors implement ChatObjects. Services compose these interfaces.
Example Service Composition (Actual Implementation)
The AiService.callAiDocuments() method demonstrates service composition:
Steps:
ExtractionService.extractContent()→ extracts content from documentsAiObjects.call()→ processes with AI modelChatService.storeWorkflowStat()→ records statistics
Outputs: AI-generated content, processing statistics, cost tracking
Adding a New Capability
Step 1: Create Connector (if needed)
Add vendor adapter in modules/connectors/ (e.g., connectorNewVendor.py). The connector class should initialize with configuration, handle API calls, and return structured responses with {"success": True/False, "data": ...} format.
Step 2: Create Interface
Implement capability contract in modules/interfaces/ (e.g., interfaceNewCapabilityObjects.py). The interface class should initialize with user context, use the connector, and provide normalized methods like performOperation() that return domain objects.
Step 3: Create Service
Compose the interface in modules/services/serviceNewCapability/mainServiceNewCapability.py. The service class should initialize with the services container, access the interface, and provide business-level methods like performBusinessOperation() that apply validation, call the interface, and enrich results.
Step 4: Register in Service Center
Wire into Services class in modules/services/__init__.py. Import the service class and wrap it in PublicService() (e.g., self.newCapability = PublicService(NewCapabilityService(self))).
Step 5: Expose via Route (if needed)
Add HTTP endpoint in modules/routes/routeNewCapability.py. Create a route handler that uses getCurrentUser dependency, creates a Services instance, calls the service method, and returns the result.
Step 6: Use in Workflows (if needed)
Create method action in modules/workflows/methods/methodNewCapability.py. Inherit from MethodBase, use the @action decorator on async methods, and return ActionResult objects with success status and documents.
Adding a New Database Domain
Adding a completely new database domain (like Real Estate, Projects, Inventory) requires creating datamodels, database interfaces, and access control. This section covers creating a new domain from scratch.
Overview
A new database domain consists of:
- Datamodels: Pydantic models defining data structures
- Database Interface Objects: CRUD operations for domain entities
- Database Interface Access: Access control and permission checking
- Database Configuration: Connection settings for the new database
- Service Integration: Optional service layer for business logic
Step 1: Create Datamodels
Create a new datamodel file in modules/datamodels/datamodel[Domain].py (e.g., datamodelRealEstate.py, datamodelProject.py).
Structure:
- Define Pydantic models inheriting from
BaseModel - Include enums for status fields and categories
- Add helper models for complex nested structures
- Use Field() with frontend metadata for UI generation
- Include standard fields:
id,mandateId,_createdBy,_createdAt,_modifiedBy,_modifiedAt
Example Structure:
datamodel[Domain].py
├── Enums (StatusEnum, CategoryEnum, etc.)
├── Helper Models (GeoPoint, Address, etc.)
├── Main Entity Models
│ ├── Entity1 (id, mandateId, fields, timestamps)
│ ├── Entity2 (id, mandateId, fields, timestamps)
│ └── Entity3 (id, mandateId, fields, timestamps)
└── Relationship Models (if needed)
Key Requirements:
- All main entities must have
id: str(UUID) - All main entities must have
mandateId: strfor multi-tenant isolation - Include audit fields:
_createdBy,_createdAt,_modifiedBy,_modifiedAt - Use
Field()withfrontend_type,frontend_readonly,frontend_requiredfor UI metadata - Define relationships using ForwardRef if models reference each other
Example:
from pydantic import BaseModel, Field
from enum import Enum
import uuid
class StatusEnum(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
ARCHIVED = "archived"
class Project(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
mandateId: str
name: str = Field(..., frontend_type="text", frontend_required=True)
status: StatusEnum = Field(..., frontend_type="select")
description: Optional[str] = Field(None, frontend_type="textarea")
_createdBy: Optional[str] = None
_createdAt: Optional[int] = None
_modifiedBy: Optional[str] = None
_modifiedAt: Optional[int] = None
Step 2: Create Database Interface Objects
Create modules/interfaces/interfaceDb[Domain]Objects.py (e.g., interfaceDbRealEstateObjects.py).
Structure:
[Domain]Objectsclass that initializes database connector- CRUD methods for each entity:
create[Entity](),get[Entity](),get[Entities](),update[Entity](),delete[Entity]() - Query execution method:
executeQuery()for custom SQL queries - User context management:
setUserContext()
Key Components:
1. Database Initialization:
def _initializeDatabase(self):
dbHost = APP_CONFIG.get("DB_[DOMAIN]_HOST", "localhost")
dbDatabase = APP_CONFIG.get("DB_[DOMAIN]_DATABASE", "poweron_[domain]")
dbUser = APP_CONFIG.get("DB_[DOMAIN]_USER")
dbPassword = APP_CONFIG.get("DB_[DOMAIN]_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_[DOMAIN]_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId if self.userId else None,
)
self.db.initDbSystem()
2. CRUD Pattern:
def create[Entity](self, entity: [Entity]) -> [Entity]:
# Ensure mandateId is set
if not entity.mandateId:
entity.mandateId = self.mandateId
# Apply access control
self.access.uam([Entity], [])
# Save to database
self.db.recordCreate([Entity], entity.model_dump())
return entity
def get[Entity](self, entityId: str) -> Optional[[Entity]]:
records = self.db.getRecordset(
[Entity],
recordFilter={"id": entityId}
)
if not records:
return None
# Apply access control
filtered = self.access.uam([Entity], records)
if not filtered:
return None
return [Entity](**filtered[0])
def get[Entities](self, filters: Optional[Dict] = None) -> List[[Entity]]:
records = self.db.getRecordset([Entity], recordFilter=filters or {})
filtered = self.access.uam([Entity], records)
return [Entity](**r) for r in filtered]
def update[Entity](self, entityId: str, updates: Dict) -> Optional[[Entity]]:
# Check access control
self.access.canModify([Entity], entityId)
# Update in database
self.db.recordUpdate([Entity], entityId, updates)
# Return updated entity
return self.get[Entity](entityId)
def delete[Entity](self, entityId: str) -> bool:
# Check access control
self.access.canModify([Entity], entityId)
# Delete from database
self.db.recordDelete([Entity], entityId)
return True
3. Singleton Factory Pattern:
_[domain]Interfaces = {}
def get[Domain]Interface(currentUser: Optional[User] = None) -> [Domain]Objects:
"""Factory function to get or create interface instance."""
userId = currentUser.id if currentUser else None
if userId not in _[domain]Interfaces:
_[domain]Interfaces[userId] = [Domain]Objects(currentUser)
return _[domain]Interfaces[userId]
Step 3: Create Database Interface Access
Create modules/interfaces/interfaceDb[Domain]Access.py (e.g., interfaceDbRealEstateAccess.py).
Structure:
[Domain]Accessclass that handles permission checkinguam()method for Unified Access Management (filtering and flagging)canModify()method for write permission checking
Key Components:
1. Access Control Class:
class [Domain]Access:
def __init__(self, currentUser: User, db: DatabaseConnector):
self.currentUser = currentUser
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
self.userRole = currentUser.role
self.db = db
def uam(self, model: Type[BaseModel], records: List[Dict]) -> List[Dict]:
"""Unified Access Management: Filter records and add access flags."""
if self.userRole == "SYSADMIN":
# SYSADMIN sees all records
filtered = records
elif self.userRole == "ADMIN":
# ADMIN sees all records in their mandate
filtered = [r for r in records if r.get("mandateId") == self.mandateId]
else:
# USER sees only their own records
filtered = [r for r in records if r.get("_createdBy") == self.userId]
# Add access flags
for record in filtered:
record["_hideView"] = False
record["_hideEdit"] = not self.canModify(model, record.get("id"))
record["_hideDelete"] = not self.canModify(model, record.get("id"))
return filtered
def canModify(self, model: Type[BaseModel], recordId: str) -> bool:
"""Check if user can modify a record."""
if self.userRole == "SYSADMIN":
return True
# Get record to check ownership
records = self.db.getRecordset(model, recordFilter={"id": recordId})
if not records:
return False
record = records[0]
if self.userRole == "ADMIN":
# ADMIN can modify records in their mandate
return record.get("mandateId") == self.mandateId
else:
# USER can only modify their own records
return record.get("_createdBy") == self.userId
2. Import in Objects File:
from modules.interfaces.interfaceDb[Domain]Access import [Domain]Access
Step 4: Configure Database Connection
Add database configuration to config.ini:
[Database]
DB_[DOMAIN]_HOST=localhost
DB_[DOMAIN]_DATABASE=poweron_[domain]
DB_[DOMAIN]_USER=postgres
DB_[DOMAIN]_PASSWORD_SECRET=your_password
DB_[DOMAIN]_PORT=5432
Database Creation:
- The
DatabaseConnector.initDbSystem()method automatically creates the database if it doesn't exist - Tables are created on-demand when first accessed via
_ensureTableExists() - No manual database schema creation needed
Step 5: Register Interface in Services (Optional)
If you need business logic, create a service that uses the interface:
Create Service (modules/services/service[Domain]/mainService[Domain].py):
class [Domain]Service:
def __init__(self, services: 'Services'):
self.services = services
def get[Domain]Interface(self) -> [Domain]Objects:
"""Get interface instance with current user context."""
return get[Domain]Interface(self.services.workflow.currentUser)
def performBusinessOperation(self, ...):
"""Business-level method that uses interface."""
interface = self.get[Domain]Interface()
# Apply business logic
# Call interface methods
# Return enriched results
Register in Service Center (modules/services/__init__.py):
from modules.services.service[Domain].mainService[Domain] import [Domain]Service
class Services:
def __init__(self, ...):
...
self.[domain] = PublicService([Domain]Service(self))
Step 6: Create Routes (Optional)
If you need HTTP endpoints, create modules/routes/route[Domain].py:
from fastapi import APIRouter, Depends
from modules.features.shared.dependencies import getCurrentUser
from modules.datamodels.datamodelUam import User
from modules.services import Services
from modules.interfaces.interfaceDb[Domain]Objects import get[Domain]Interface
router = APIRouter()
@router.get("/[domain]/entities")
async def getEntities(
currentUser: User = Depends(getCurrentUser)
):
"""Get all entities."""
interface = get[Domain]Interface(currentUser)
entities = interface.get[Entities]()
return {"success": True, "data": [e.model_dump() for e in entities]}
Step 7: Use in Workflows (Optional)
If you need workflow actions, create modules/workflows/methods/method[Domain].py:
from modules.workflows.methods.methodBase import MethodBase
from modules.workflows.methods.methodBase import action
from modules.workflows.methods.methodBase import ActionResult
class Method[Domain](MethodBase):
name = "[domain]"
description = "[Domain] operations"
def __init__(self, services):
super().__init__(services)
@action
async def performOperation(self, parameters: Dict[str, Any]) -> ActionResult:
"""Perform domain operation."""
interface = get[Domain]Interface(self.services.workflow.currentUser)
# Perform operation
# Return ActionResult
Complete Example: Real Estate Domain
Datamodels (datamodelRealEstate.py):
Projekt,Parzelle,Dokument,Kanton,Gemeinde,LandGeoPunkt,GeoPolylinie(geographic data)Kontext(context/notes)- Enums:
StatusProzess,DokumentTyp,GeoTag
Interface Objects (interfaceDbRealEstateObjects.py):
RealEstateObjectsclass- CRUD methods for all entities
executeQuery()for custom SQL- Database initialization with
DB_REALESTATE_*config
Interface Access (interfaceDbRealEstateAccess.py):
RealEstateAccessclassuam()method for filteringcanModify()method for permissions
Configuration:
DB_REALESTATE_HOST=localhost
DB_REALESTATE_DATABASE=poweron_realestate
DB_REALESTATE_USER=postgres
DB_REALESTATE_PASSWORD_SECRET=...
DB_REALESTATE_PORT=5432
Best Practices
1. Naming Conventions:
- Datamodel file:
datamodel[Domain].py(PascalCase domain name) - Interface Objects:
interfaceDb[Domain]Objects.py - Interface Access:
interfaceDb[Domain]Access.py - Database config:
DB_[DOMAIN]_*(uppercase with underscores)
2. Mandate Isolation:
- Always set
mandateIdon create operations - Filter by
mandateIdin access control - Never expose data across mandates
3. Access Control:
- Always call
self.access.uam()before returning records - Always call
self.access.canModify()before write operations - Respect role hierarchy: SYSADMIN > ADMIN > USER
4. Error Handling:
- Validate user context before operations
- Handle missing records gracefully (return None, not raise)
- Log errors with context (user ID, mandate ID, operation)
5. Database Management:
- Let
DatabaseConnectorhandle table creation automatically - Use
_ensureTableExists()for supporting tables with foreign keys - Don't manually create database schemas
6. Testing:
- Test CRUD operations with different user roles
- Test mandate isolation (users can't see other mandates' data)
- Test access control (users can't modify others' records)
Common Patterns
Pattern 1: Simple Domain (Single Entity)
- One main entity model
- Basic CRUD operations
- Standard access control
Pattern 2: Hierarchical Domain (Parent-Child)
- Multiple related entities
- Foreign key relationships
- Cascade operations (delete children when parent deleted)
Pattern 3: Complex Domain (Multiple Entities + Relationships)
- Multiple entities with relationships
- Supporting tables (lookup tables, reference data)
- Custom query methods for complex operations
Security & Governance
Access Control
- RBAC: Role-based access control enforced at Interface layer (
interface*Access.py)- SYSADMIN: Full system access, all mandates
- ADMIN: Full access within mandate
- USER: Access to own records only
- UAM: Unified Access Management filters recordsets by privilege and adds access flags (
_hideView,_hideEdit,_hideDelete)
Secrets Management
- Centralized Configuration: Credentials stored in
config.iniwith encryption - Interface-Level Access: Connectors receive credentials through interfaces, not directly
- No Leakage: Credentials never exposed to workflows or services
Audit
- Automatic Tracking: All database operations include
_createdBy,_modifiedBy,_createdAt,_modifiedAt - Workflow Logging: Workflow steps logged via
ChatService.storeLog() - Security Events: Authentication events logged via
auditLogger.logSecurityEvent()
Quotas
- Rate Limiting: Applied at route level using
slowapi.Limiter - Token Refresh Limits: OAuth token refresh limited to 3 attempts per hour per connection
- Cost Tracking: AI operations track costs via
ChatService.storeWorkflowStat()
Observability
Structured Logging
- Layer-Specific Loggers: Each layer uses module-specific loggers (e.g.,
logging.getLogger("modules.services.serviceAi")) - Context Information: Logs include user ID, workflow ID, operation context
- Error Details: Exceptions logged with full stack traces and context
Tracing
- Operation IDs: Long-running operations use unique operation IDs for tracking
- Progress Logging:
ChatService.progressLogStart(),progressLogUpdate(),progressLogFinish() - Workflow State: Workflow state persisted to database for debugging
Metrics
- Per-Capability Tracking: Services track operation counts, costs, processing time
- Workflow Statistics:
ChatStatrecords track bytes sent/received, error counts, prices - Performance Monitoring: Processing time tracked for all AI calls and service operations
Minimal Request Lifecycle
sequenceDiagram
participant Client
participant Route
participant Services as Service Center
participant Service
participant Interface
participant Connector
participant External as External System/DB
Client->>Route: HTTP Request
Route->>Route: Authenticate (getCurrentUser)
Route->>Services: Create(user, workflow)
Services->>Services: Initialize interfaces
Services->>Services: Initialize services
Services-->>Route: services instance
Route->>Service: services.capability.operation()
Service->>Interface: interface.method(params)
Interface->>Interface: Apply access control (UAM)
Interface->>Connector: connector.operation(params)
Connector->>External: API call / DB query
External-->>Connector: Response
Connector-->>Interface: Normalized data
Interface-->>Service: Domain object
Service-->>Services: Business result
Services-->>Route: Result
Route-->>Client: HTTP Response
Steps:
- Route receives request or workflow triggers an action
- Service Center resolves service instance and validates user context
- Service executes using interfaces; interfaces call connectors
- Results propagate back; logs/metrics recorded; workflow advances state
Benefits
- Replace vendors without breaking services: Interfaces shield changes (e.g., swap PostgreSQL for JSON connector)
- Accelerate feature delivery: Services are reusable building blocks
- Improve reliability and security: Centralized policies and observability
- Empower workflows/agents: Perform complex tasks with simple, typed calls
- Type safety: Pydantic models ensure data consistency
- Testability: Clear boundaries enable mocking and unit testing
- Maintainability: Separation of concerns makes code easier to understand and modify
Quick Map to Code (for orientation)
gateway/modules/connectors/→ Vendor adapters (e.g.,connectorDbPostgre.py,connectorVoiceGoogle.py)gateway/modules/interfaces/→ Capability contracts (e.g.,interfaceDbChatObjects.py,interfaceAiObjects.py)gateway/modules/services/→ Composed capabilities (e.g.,serviceAi/mainServiceAi.py,serviceChat/mainServiceChat.py)gateway/modules/workflows/→ Orchestrations/agents (e.g.,workflowManager.py,methods/methodAi.py)gateway/modules/routes/→ HTTP endpoints (e.g.,routeChatPlayground.py,routeWorkflows.py)
This framework is the backbone for market customer journey features: build once as services, reuse everywhere in workflows.
Visuals
Layered Architecture
flowchart TB
subgraph ClientOrWorkflow[Client / Workflow Engine]
C[Feature or Agent Task]
end
subgraph ServiceCenter[Service Center]
SC[Services Container\nUser Context, Interfaces, Services]
end
subgraph Services[Services]
S1[AI Service]
S2[Chat Service]
S3[Extraction Service]
S4[Generation Service]
end
subgraph Interfaces[Interfaces]
I1[ChatObjects]
I2[AppObjects]
I3[AiObjects]
I4[ComponentObjects]
end
subgraph Connectors[Connectors]
K1[PostgreSQL Connector]
K2[JSON Connector]
K3[Google Speech Connector]
K4[AI Provider Connectors]
end
subgraph External[External Systems]
E1[(PostgreSQL Database)]
E2[Google Cloud APIs]
E3[AI APIs\nOpenAI, Anthropic]
end
C --> SC --> S1 & S2 & S3 & S4
S1 --> I3
S2 --> I1
S3 --> I4
S4 --> I1 & I4
I1 --> K1
I2 --> K1
I3 --> K4
I4 --> K1
K1 --> E1
K2 --> E1
K3 --> E2
K4 --> E3
Request / Action Sequence
sequenceDiagram
participant Client as Client / Workflow
participant SC as Service Center
participant S as Service
participant I as Interface
participant AC as Access Control
participant K as Connector
participant EXT as External Tool/DB
Client->>SC: Request capability (e.g., services.ai.callAiDocuments)
SC->>SC: Initialize with user context
SC->>S: Get service instance
S->>I: Call normalized method (e.g., aiObjects.call)
I->>AC: Check permissions (UAM)
AC-->>I: Permission granted
I->>K: Prepare vendor-specific request
K->>EXT: API/DB call (auth, retries)
EXT-->>K: Response
K-->>I: Map to normalized DTO
I-->>S: Return normalized result
S->>S: Apply business logic
S-->>SC: Business output (validated, enriched)
SC-->>Client: Typed response, telemetry recorded
Service Center Components
graph LR
subgraph SC[Service Center - Services Class]
REG[Service Registry]
CTX[User Context]
WF[Workflow Context]
INT[Interface Access]
FAC[Service Factory]
end
REG --> FAC
CTX --> FAC
WF --> FAC
INT --> FAC
FAC -->|builds| Svc[(Service Instances)]
subgraph Layers[Below Services]
IF[Interfaces]
CON[Connectors]
end
Svc --> IF --> CON
subgraph Services[Services]
AI[AI Service]
Chat[Chat Service]
Extract[Extraction Service]
Gen[Generation Service]
end
Svc --> AI & Chat & Extract & Gen
Workflow State Machine (Conceptual)
stateDiagram-v2
[*] --> Plan
Plan: Decide next action (AI or rules)
Plan --> CallService: needs external capability
Plan --> Done: no more steps
CallService: Invoke via Service Center
CallService --> HandleResult
HandleResult: Persist, evaluate, log
HandleResult --> Plan: more work
HandleResult --> Done: goal achieved
Done --> [*]
Interface Access Control Flow
sequenceDiagram
participant Service
participant Interface as Interface Objects
participant Access as Access Control
participant Connector
participant DB as Database
Service->>Interface: CRUD Operation
Interface->>Access: Check permissions (uam)
Access->>Access: Check user privilege
Access->>Access: Filter by mandateId
Access->>Access: Check ownership (_createdBy)
Access->>Access: Add access flags
Access-->>Interface: Filtered data + flags
Interface->>Connector: Execute query
Connector->>DB: SQL Query
DB-->>Connector: Results
Connector-->>Interface: Raw data
Interface->>Interface: Transform to datamodel
Interface-->>Service: Domain objects with access flags
Development Best Practices
1. Always Use Service Center
✅ GOOD: Use Service Center via Services(user=current_user) and call services.ai.callAiDocuments(), services.chat.storeMessageWithDocuments(), etc.
❌ BAD: Direct interface access bypasses the service layer (e.g., calling getChatInterface(user).getWorkflow() directly).
2. Keep Services Stateless
✅ GOOD: Stateless services use the database for persistence (e.g., self.services.interfaceDbApp.getCache()).
❌ BAD: Stateful services store data in instance variables (e.g., self.cache = {}).
3. Use Datamodels for Type Safety
✅ GOOD: Use Pydantic models like ChatWorkflow, ChatMessage from modules.datamodels.datamodelChat. Create instances with ChatWorkflow(**data) and return typed results.
❌ BAD: Use raw dictionaries without type safety.
4. Apply Access Control
✅ GOOD: Interfaces apply UAM automatically (e.g., self.interfaceDbChat.getWorkflows() filters by user privilege).
❌ BAD: Bypass access control by calling connectors directly (e.g., self.connector.getRecordset() has no filtering).
5. Handle Errors Gracefully
✅ GOOD: Return structured errors with {"success": True/False, "data": ..., "error": ...} format. Log exceptions with context.
❌ BAD: Let exceptions propagate to callers without handling.
Workflow Engineering
Workflow engineering is the process of designing, building, and maintaining workflows that orchestrate multi-step tasks using the gateway's service layer. Workflows transform user requests into structured execution plans, coordinate action execution, and manage state throughout the process.
Understanding Workflow Architecture
Workflows operate at the highest level of the gateway architecture, orchestrating services to accomplish complex goals. They provide:
- Intelligent Planning: AI-powered task breakdown and action generation
- State Management: Track progress, maintain context, and handle errors
- Document Flow: Manage document references and lineage throughout execution
- Adaptive Execution: Retry failed tasks, learn from results, improve over time
- Multi-Mode Support: Different execution strategies for different use cases
Workflow Components
WorkflowManager: Main orchestration controller that manages workflow lifecycle (workflowStart(), workflowStop(), _workflowProcess())
WorkflowProcessor: Delegates to mode-specific implementations (Actionplan, Dynamic, Automation)
TaskPlanner: Generates structured task plans from user input using AI
ActionExecutor: Executes individual actions by invoking methods from the global methods catalog
MessageCreator: Creates and persists workflow messages with document associations
Method System: Extensible plugin framework for defining reusable actions
Workflow Execution Pipeline
Every workflow follows a four-stage pipeline:
- Send First Message: Analyze user intent, extract documents, detect language, normalize request
- Plan Tasks: Generate structured task plan with objectives, success criteria, and dependencies
- Execute Tasks: Execute each task sequentially, maintaining context between tasks
- Process Results: Generate feedback, create completion message, update workflow status
Workflow Modes
The gateway supports three distinct workflow modes, each optimized for different use cases:
Actionplan Mode
Strategy: Batch planning with quality review and intelligent retry
Characteristics:
- Plans all actions upfront before execution begins
- Reviews results against success criteria after execution
- Retries failed tasks up to 3 times with cumulative improvements
- Best for complex multi-step workflows with specific requirements
Use Cases: Data processing pipelines, document analysis with requirements, complex transformations
Execution Flow:
- Generate complete action plan for entire task
- Execute all actions sequentially
- Review results against success criteria
- Retry with improvements if criteria not met
- Return final result
Dynamic Mode
Strategy: Iterative, just-in-time action generation
Characteristics:
- Generates one action at a time based on current state
- Each action's result influences the next action
- Workflow path emerges organically based on findings
- Limited by
maxSteps(default: 5) to prevent infinite loops
Use Cases: Research workflows, exploratory data analysis, iterative problem solving, uncertain paths
Execution Flow:
- Generate single next action based on current context
- Execute action immediately
- Evaluate if task objective is met
- Continue if objective not met and under max steps
- Return result when objective met or max steps reached
Automation Mode
Strategy: Predefined JSON-based deterministic execution
Characteristics:
- No AI planning or action generation
- User provides complete task and action plan in JSON format
- Deterministic execution (same input always produces same sequence)
- Fastest execution time (no planning overhead)
Use Cases: Repeated workflows, automated jobs, batch processing, template execution, routine operations
Execution Flow:
- Parse predefined JSON plan from user input
- Execute actions in order specified in JSON
- Collect results without review
- Return execution summary
Building New Workflows
New workflows are typically built using Actionplan or Dynamic modes, where AI generates the execution plan based on user input. This section covers how to create workflows that adapt to user requests.
Starting a New Workflow
Entry Point: WorkflowManager.workflowStart()
Required Parameters:
userInput: UserInputRequest containing prompt, file IDs, and languageworkflowMode: WorkflowModeEnum (WORKFLOW_ACTIONPLAN, WORKFLOW_DYNAMIC, or WORKFLOW_AUTOMATION)workflowId: Optional ID to continue existing workflow
Process:
- Create or load
ChatWorkflowrecord in database - Initialize workflow state (status="running", currentRound=1, counters=0)
- Discover and update method instances with current services
- Launch asynchronous processing pipeline
- Return workflow object immediately (non-blocking)
Example Flow:
Route → chatStart() → WorkflowManager.workflowStart() → _workflowProcess()
Workflow Input Processing
The first stage (_sendFirstMessage()) processes user input:
Intent Analysis: AI analyzes user input to extract:
- Detected language (ISO 639-1 code)
- Normalized request (full, explicit restatement)
- Core intent (primary goals and requirements)
- Bulky context items (large data blocks extracted as separate documents)
Document Management:
- Processes user-uploaded files (converts file IDs to ChatDocument objects)
- Extracts large content blocks from prompt (code snippets, tables, lists)
- Creates document records in component database
- Applies neutralization if enabled in user settings
- Associates documents with labels (e.g., "round1_usercontext")
Message Creation: Creates first message with role="user", status="first", and all associated documents
Task Planning
The second stage (_planTasks()) generates structured task plans:
Planning Process:
- Uses cleaned user intent from previous stage
- Calls
WorkflowProcessor.generateTaskPlan()which delegates to mode-specific implementation - For Actionplan/Dynamic modes: Uses
TaskPlanner.generateTaskPlan()with AI - For Automation mode: Parses predefined JSON plan from user input
TaskPlan Structure:
overview: High-level description of the plantasks: Array of TaskStep objectsuserMessage: Original user request
TaskStep Structure:
id: Unique task identifierobjective: What the task should accomplishdependencies: Array of task IDs this task depends onsuccessCriteria: Array of measurable criteria for task completionestimatedComplexity: Complexity estimate (simple, medium, complex)userMessage: User-facing description of the task
AI Planning: Uses services.ai.callAiPlanning() with quality settings to generate detailed task breakdown. The AI receives:
- User prompt and normalized intent
- Available methods and actions (from method discovery)
- Available documents and connections
- Workflow context and history
Task Execution
The third stage (_executeTasks()) executes each task sequentially:
For Each Task:
-
Build
TaskContextcontaining:- Task details (objective, success criteria, dependencies)
- Workflow state (current round, task, action numbers)
- Available documents (from current and previous rounds)
- Available connections (user's OAuth connections)
- Previous task results (for context and dependencies)
-
Call
WorkflowProcessor.executeTask()which delegates to mode-specific execution -
Receive
TaskResultwith:success: Boolean indicating task completion statusfeedback: Human-readable summary of what was accomplisheddocuments: List of ChatDocument objects created during task executionreviewResult: Optional ReviewResult if quality review was performed
-
Prepare task handover data for subsequent tasks
-
Accumulate results for use by dependent tasks
Mode-Specific Execution:
Actionplan Mode:
- Generates complete action plan for entire task upfront
- Executes all actions sequentially
- Reviews results against success criteria
- Retries with improvements if criteria not met (max 3 attempts)
Dynamic Mode:
- Generates single next action based on current state
- Executes action immediately
- Evaluates if task objective is met
- Continues generating actions until objective met or max steps reached
Automation Mode:
- Uses predefined action list from JSON plan
- Executes actions in order specified
- No retry logic or quality review
Action Execution
Actions are executed by ActionExecutor.executeSingleAction():
Process:
- Resolve parameters (document references, connections, etc.)
- Look up method in global methods catalog
- Validate action exists within method
- Invoke action method with parameters
- Extract result text from ActionDocument objects
- Convert ActionDocuments to ChatDocuments for persistence
- Create action completion message
- Return ActionResult with success status and documents
Action Invocation: Actions are invoked using compound names (e.g., "ai.process", "sharepoint.search") or separate method/action names.
Document References: Actions receive document references in three formats:
docItem:<id>:<filename>: Single document by IDdocList:<label>: All documents with label (newest)docList:<messageId>:<label>: Documents from specific message
Result Handling: Action results are converted to ChatDocument objects, stored in database, and associated with workflow messages. Result labels (from execResultLabel) determine how results are routed to subsequent actions.
Workflow Completion
The fourth stage (_processWorkflowResults()) finalizes the workflow:
Completion Scenarios:
- Stopped: User-initiated stop, creates message with status="last", updates status to "stopped"
- Failed: Unrecoverable error, creates error message, updates status to "failed", logs error
- Completed: Successful completion, generates feedback via
_generateWorkflowFeedback(), creates completion message, updates status to "completed"
Feedback Generation: Counts user and assistant messages, reports task completion status, provides concise overview of accomplishments.
State Persistence: Workflow state updated in both in-memory object and database for consistency.
Building Routine and Predefined Workflows
Routine and predefined workflows are reusable workflows that execute the same sequence of actions every time. They bypass the AI task planning step entirely by providing a complete task and action plan directly, giving you full control over the execution sequence. These workflows use Automation Mode (WORKFLOW_AUTOMATION) which skips intent analysis, task planning, and action generation, executing your provided plan deterministically.
When to Use Routine/Predefined Workflows
Routine and predefined workflows are ideal for:
- Repeated Operations: Tasks that run regularly with the same steps
- Batch Processing: Processing multiple items with identical workflow
- Scheduled Jobs: Automated workflows triggered by schedules or events
- Template Execution: Workflows that follow a standard pattern with variable inputs
- Deterministic Requirements: When exact execution sequence must be guaranteed
- Exact Sequence Required: The execution order must be guaranteed and consistent
- Performance Critical: Eliminating AI planning overhead improves execution speed
- Integration Workflows: Connecting multiple systems with fixed interaction patterns
- Testing and Debugging: Reproducible workflows for testing specific scenarios
How Routine/Predefined Workflows Work
Routine and predefined workflows use Automation Mode (WORKFLOW_AUTOMATION) which:
- Skips Intent Analysis: No AI analysis of user input for intent extraction
- Skips Task Planning: No AI-generated task plan, uses provided plan directly
- Skips Action Generation: No AI-generated actions, uses provided actions directly
- Direct Execution: Executes the provided plan deterministically
The workflow system extracts the JSON plan from the user input and executes it without any AI planning calls. This provides:
- Instant Execution: No planning overhead, immediate execution
- Full Determinism: Same input always produces identical execution sequence
- Complete Control: You define every step of the workflow
- Performance: Faster execution without AI planning delays
Providing Predefined Plans
There are two ways to provide predefined plans:
Method 1: Direct JSON in User Input
Embed the complete TaskPlan JSON directly in the user input prompt, wrapped in HTML comment markers. This is ideal for one-off workflows or testing:
User prompt text (optional)
<!--TEMPLATE_PLAN_START-->
{
"overview": "Process documents and generate report",
"tasks": [
{
"id": "task1",
"objective": "Extract content from documents",
"actions": [...]
}
],
"userMessage": "Process the uploaded documents"
}
<!--TEMPLATE_PLAN_END-->
The Automation mode extracts the JSON between <!--TEMPLATE_PLAN_START--> and <!--TEMPLATE_PLAN_END--> markers and uses it as the task plan.
Method 2: Via AutomationDefinition
Store the plan in an AutomationDefinition record for reusable, scheduled, or event-driven workflows. The plan is embedded in the template field and executed automatically via ChatObjects.executeAutomation().
AutomationDefinition Structure:
id: Unique automation identifier (UUID)mandateId: Mandate scope for the automationlabel: Human-readable nameschedule: Cron expression or event trigger (optional)template: Template text with placeholders and embedded planplaceholders: Dictionary mapping placeholder names to valuesactive: Boolean flag to enable/disable automationeventId: Event identifier for event-driven triggersstatus: Current execution statusexecutionLogs: Array of execution history (last 50)_createdBy: User ID who created the automation_createdAt,_modifiedAt: Timestamps
Creation Process:
- Define task plan as JSON structure
- Embed plan in template with HTML comment markers
- Create AutomationDefinition record with template and placeholders
- Store in database via
ChatObjects.createAutomationDefinition()or HTTP API - Enable automation by setting
active=true
Storage and Management
Automated workflows are stored as AutomationDefinition records in the Chat database (PostgreSQL). All automation operations are exposed via HTTP API routes.
Storage Location: PostgreSQL database, managed via ChatObjects interface
API Routes: All automation operations are available via /api/automations endpoints:
- GET
/api/automations: List all automations (with optional pagination) - POST
/api/automations: Create new automation definition - GET
/api/automations/{automationId}: Get single automation by ID - PUT
/api/automations/{automationId}: Update automation definition - DELETE
/api/automations/{automationId}: Delete automation definition - POST
/api/automations/{automationId}/execute: Execute automation immediately (manual/test execution) - GET
/api/automations/attributes: Get attribute definitions for frontend form generation
Rate Limiting:
- List/Get operations: 30 requests/minute
- Create/Update/Delete operations: 10 requests/minute
- Execute operation: 5 requests/minute
Example: Creating Automation via API:
POST /api/automations
Content-Type: application/json
{
"label": "Daily Report Generation",
"template": "Template with embedded plan...",
"placeholders": {"DATE": "2025-01-25"},
"schedule": "0 22 * * *",
"active": true
}
Example: Executing Automation via API:
POST /api/automations/{automationId}/execute
Response: ChatWorkflow object with created workflow
Plan Structure
The JSON plan must follow the exact TaskPlan structure:
Top-Level TaskPlan Object:
overview: String describing the overall plan (required)tasks: Array of TaskStep objects (required, at least one task)userMessage: String with user-facing message (optional)
TaskStep Object (each task):
id: Unique string identifier (required, e.g., "task1", "extract_content")objective: String describing what the task accomplishes (required)actions: Array of ActionItem objects (required, at least one action)successCriteria: Array of strings describing success conditions (optional, not validated in Automation mode)dependencies: Array of task IDs this task depends on (optional, ensures execution order)estimatedComplexity: String complexity estimate (optional: "simple", "medium", "complex")userMessage: String with user-facing task description (optional)
ActionItem Object (each action):
id: Unique string identifier (required, e.g., "action1", "extract_docs")execMethod: String method name (required, e.g., "ai", "sharepoint", "extraction")execAction: String action name (required, e.g., "process", "search", "extractContent")execParameters: Dictionary of action-specific parameters (required)execResultLabel: String label for routing results (optional, e.g., "extracted_content")expectedDocumentFormats: Array of expected MIME types (optional)userMessage: String with user-facing action description (optional)
Action Parameters (execParameters dictionary):
documentList: Array of document references (optional, e.g.,["docList:round1_usercontext"])connections: Array of connection references (optional, e.g.,["connection:msft:username"])- Method-specific parameters (e.g.,
prompt,query,format,options)
Complete Workflow Example
Example: Document Processing and Report Generation
User Input with Embedded Plan:
Process the uploaded documents and generate a summary report.
<!--TEMPLATE_PLAN_START-->
{
"overview": "Extract content from uploaded documents, analyze with AI, and generate PDF report",
"tasks": [
{
"id": "extract_content",
"objective": "Extract text content from all uploaded documents",
"actions": [
{
"id": "extract_docs",
"execMethod": "extraction",
"execAction": "extractContent",
"execParameters": {
"documentList": ["docList:round1_usercontext"],
"options": {
"processDocumentsIndividually": false,
"chunkAllowed": true
}
},
"execResultLabel": "extracted_content"
}
],
"successCriteria": [
"All documents processed successfully",
"Content extracted without errors"
]
},
{
"id": "analyze_content",
"objective": "Analyze extracted content using AI",
"dependencies": ["extract_content"],
"actions": [
{
"id": "ai_analyze",
"execMethod": "ai",
"execAction": "process",
"execParameters": {
"prompt": "Analyze the following document content and provide a comprehensive summary with key insights:",
"documentList": ["docList:extracted_content"],
"options": {
"operationType": "TEXT_ANALYSIS",
"priority": "normal"
}
},
"execResultLabel": "analysis_result"
}
],
"successCriteria": [
"AI analysis completed",
"Summary generated successfully"
]
},
{
"id": "generate_report",
"objective": "Generate PDF report from analysis results",
"dependencies": ["analyze_content"],
"actions": [
{
"id": "render_pdf",
"execMethod": "generation",
"execAction": "renderReport",
"execParameters": {
"content": "Use the analysis results to create a formatted report",
"documentList": ["docList:analysis_result"],
"format": "pdf",
"title": "Document Analysis Report"
},
"execResultLabel": "final_report"
}
],
"successCriteria": [
"PDF report generated",
"Report contains all analysis results"
]
}
],
"userMessage": "Process documents and generate report"
}
<!--TEMPLATE_PLAN_END-->
Workflow Execution:
- Start workflow with
WORKFLOW_AUTOMATIONmode - System extracts JSON plan from user input
- Skips intent analysis and task planning
- Executes tasks sequentially: extract_content → analyze_content → generate_report
- Each task executes its actions in order
- Results flow between tasks via document references
Starting Routine/Predefined Workflows
Method 1: Via API Route
Call the workflow start endpoint with workflowMode=WORKFLOW_AUTOMATION:
POST /api/chat/start?workflowMode=Automation
Body: {
"prompt": "User prompt with embedded plan...",
"listFileId": ["file1", "file2"],
"userLanguage": "en"
}
Method 2: Via Feature Function
Call chatStart() with Automation mode:
from modules.features.chatPlayground.mainChatPlayground import chatStart
from modules.datamodels.datamodelChat import WorkflowModeEnum, UserInputRequest
userInput = UserInputRequest(
prompt="User prompt with embedded plan...",
listFileId=["file1", "file2"],
userLanguage="en"
)
workflow = await chatStart(
currentUser=user,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
workflowId=None
)
Method 3: Via AutomationDefinition
Create automation definition and execute:
automation = chatInterface.createAutomationDefinition({
"label": "Daily Report Generation",
"template": "Template with embedded plan...",
"placeholders": {"DATE": "2025-01-25"},
"active": True
})
workflow = await chatInterface.executeAutomation(automation.id)
Execution Methods:
1. Manual Execution (Immediate)
Via HTTP API:
POST /api/automations/{automationId}/execute
Via Interface:
chatInterface = getChatInterface(currentUser)
workflow = await chatInterface.executeAutomation(automationId)
Execution Process (executeAutomation() method):
- Loads AutomationDefinition from database
- Replaces placeholders in template (
{{PLACEHOLDER_NAME}}patterns) - Extracts JSON plan from template (between
<!--TEMPLATE_PLAN_START-->and<!--TEMPLATE_PLAN_END-->markers) - Gets creator user from automation's
_createdByfield - Creates
UserInputRequestwith embedded plan - Starts workflow with
WORKFLOW_AUTOMATIONmode using creator user's context - Sets workflow name to
"automated: {automationLabel}" - Logs execution to
executionLogsarray (keeps last 50)
2. Scheduled Execution
Automations with schedule field (cron expression) are executed by scheduler service:
- Set
schedulefield (e.g.,"0 22 * * *"for daily at 22:00) - Set
active=True - Automation is registered with scheduler service
- Scheduler triggers automation at scheduled times
- Uses event system to call automation handler
3. Event-Driven Execution
Automations with eventId field are triggered by external events:
- Set
eventIdfield with event identifier - Set
active=True - Automation is registered with event system
- External events trigger automation execution
- Uses same execution flow as scheduled execution
Important: Automations execute with the creator user's context (_createdBy), not the current user. This ensures consistent permissions and data access.
Template Placeholders
When using AutomationDefinition, templates support placeholder replacement using {{PLACEHOLDER_NAME}} syntax:
Built-in Placeholders:
{{CURRENT_DATE}}: Current date in ISO format{{CURRENT_TIME}}: Current timestamp{{USER_ID}}: Current user ID (creator user's ID){{MANDATE_ID}}: Current mandate ID (creator user's mandate)
Custom Placeholders: Defined in placeholders dictionary of AutomationDefinition
Replacement: ChatObjects._replacePlaceholders() replaces all placeholders before execution. Placeholders are replaced in the template before the JSON plan is extracted.
Execution Logs
Each automation execution creates a log entry stored in the executionLogs array:
Log Entry Structure:
timestamp: Execution start time (UTC timestamp)workflowId: Created workflow ID (UUID)status: Execution status ("running", "completed", or "error")messages: Array of execution messages (e.g., "Started execution", "Template placeholders replaced successfully", "Workflow started successfully")
Log Management:
- Logs are appended to
executionLogsarray after each execution - Only last 50 executions are kept (older logs are automatically removed)
- Logs are updated even on errors (with error status and error message)
- Logs can be retrieved via
GET /api/automations/{automationId}endpoint
Document Reference Flow
Routine and predefined workflows use document references to pass data between actions:
Initial Documents: User-uploaded files are available as docList:round1_usercontext
Action Results: Actions create documents with labels specified in execResultLabel:
- Action with
execResultLabel: "extracted_content"creates documents labeledextracted_content - Subsequent actions reference these via
documentList: ["docList:extracted_content"]
Reference Resolution: The system automatically resolves document references:
docList:round1_usercontext→ All documents from user's first messagedocList:extracted_content→ All documents with label "extracted_content" (newest)docItem:<id>:<filename>→ Specific document by ID
Example Flow:
- User uploads documents → Available as
docList:round1_usercontext - Extract action processes
docList:round1_usercontext→ CreatesdocList:extracted_content - AI action processes
docList:extracted_content→ CreatesdocList:analysis_result - Generate action processes
docList:analysis_result→ CreatesdocList:final_report
Task Dependencies
Routine and predefined workflows support task dependencies to ensure correct execution order:
Dependency Declaration: Use dependencies array in TaskStep:
{
"id": "task2",
"objective": "Process results from task1",
"dependencies": ["task1"],
"actions": [...]
}
Execution Order: Tasks execute in dependency order:
- Tasks with no dependencies execute first
- Dependent tasks wait for their dependencies to complete
- Parallel execution possible for independent tasks
Dependency Validation: System validates dependencies before execution:
- Ensures all referenced task IDs exist
- Detects circular dependencies
- Validates dependency chain
Parameterization and Placeholders
Routine and predefined workflows can use placeholders for variable inputs:
In AutomationDefinition Templates:
- Define placeholders in
placeholdersdictionary - Use
{{PLACEHOLDER_NAME}}syntax in template - Replace placeholders before execution
In Direct JSON Plans:
- Use string interpolation or template replacement before embedding
- Replace variables with actual values
- Ensure JSON remains valid after replacement
Built-in Placeholders (when using AutomationDefinition):
{{CURRENT_DATE}}: Current date in ISO format{{CURRENT_TIME}}: Current timestamp{{USER_ID}}: Current user ID{{MANDATE_ID}}: Current mandate ID
Error Handling
Routine and predefined workflows have no automatic retry logic, so error handling must be explicit:
Action-Level Errors: Actions should handle errors gracefully:
- Catch exceptions and return
ActionResultwithsuccess=False - Include error messages for debugging
- Don't let exceptions propagate
Task-Level Errors: Tasks continue even if individual actions fail:
- Failed actions are logged but don't stop task execution
- Subsequent actions still execute
- Task success depends on action results
Workflow-Level Errors: Workflow fails if critical tasks fail:
- Workflow status set to "failed"
- Error message created
- Execution stops
Best Practices:
- Include error handling actions in plans
- Validate inputs before processing
- Use try-catch patterns in action implementations
- Log errors with context for debugging
Best Practices
1. Validate Plan Structure
Before embedding plans, validate JSON structure:
- Ensure all required fields are present
- Validate task IDs are unique
- Check action method/action names exist
- Verify document references are correct
2. Test Incrementally
Build and test workflows incrementally:
- Start with single task, single action
- Add tasks one at a time
- Test each addition before proceeding
- Verify document flow between tasks
3. Use Meaningful IDs
Use descriptive IDs for tasks and actions:
extract_contentinstead oftask1ai_analyzeinstead ofaction2- Makes plans more readable and maintainable
4. Document Plans
Include clear descriptions:
overview: Explain overall workflow purposeobjective: Describe what each task accomplishesuserMessage: Provide user-facing descriptions
5. Keep Plans Simple
Automation mode has no retry logic, so ensure actions are robust and handle errors gracefully.
6. Use Result Labels
Set execResultLabel on actions to route results to subsequent actions via document references.
7. Document Dependencies
Use task dependencies array to ensure correct execution order.
8. Test Thoroughly
Test automation plans manually before creating AutomationDefinition records.
9. Version Control
Store automation plans in version control for change tracking.
10. Monitor Execution
Review executionLogs regularly to identify failures or performance issues.
11. Use Placeholders
Leverage placeholders for variable inputs rather than hardcoding values.
12. Handle Edge Cases
Consider edge cases in plans:
- Empty document lists
- Missing connections
- Invalid parameters
- Timeout scenarios
13. Keep Plans Maintainable
Design plans for long-term maintenance:
- Use clear, consistent structure
- Avoid deeply nested dependencies
- Keep tasks focused and atomic
- Document complex logic
14. Include Error Handling
Include error handling actions in plans for critical workflows.
Comparison: Routine/Predefined vs AI-Planned Workflows
| Aspect | Routine/Predefined Workflows | AI-Planned Workflows |
|---|---|---|
| Planning Time | Instant (no AI calls) | Slower (AI planning overhead) |
| Determinism | Fully deterministic | May vary between runs |
| Flexibility | Fixed sequence | Adapts to input |
| Error Recovery | Manual (no retries) | Automatic retries with improvements |
| Complexity | Requires plan design | AI handles complexity |
| Control | Full control over steps | AI decides steps |
| Best For | Routine, repeated workflows | Novel, exploratory workflows |
Advanced Workflow Patterns
Pattern 1: Conditional Execution
Use multiple tasks with dependencies to create conditional flows:
- Task A: Check condition
- Task B: Execute if condition met (depends on A)
- Task C: Execute if condition not met (depends on A)
Pattern 2: Parallel Processing
Execute independent tasks in parallel by removing dependencies:
- Task 1: Process documents (no dependencies)
- Task 2: Fetch external data (no dependencies)
- Task 3: Combine results (depends on Task 1 and Task 2)
Pattern 3: Iterative Processing
Use document references to create loops:
- Task 1: Process batch (creates
docList:batch1) - Task 2: Process results (processes
docList:batch1, createsdocList:batch2) - Task 3: Check if done (processes
docList:batch2) - Repeat pattern for multiple iterations
Pattern 4: Error Recovery
Include error handling tasks:
- Task 1: Primary operation
- Task 2: Error handling (depends on Task 1, executes if Task 1 fails)
- Task 3: Retry operation (depends on Task 2)
Method System and Creating Actions
The method system provides an extensible framework for defining reusable actions that workflows can invoke. Methods encapsulate specific capabilities and expose them through decorated action functions.
Understanding Methods
Methods: Plugin-like classes that inherit from MethodBase and expose actions via @action decorator
Actions: Async methods decorated with @action that perform specific operations
Automatic Discovery: Methods are discovered at runtime via introspection, no manual registration required
Global Catalog: Discovered methods are stored in global methods dictionary for lookup during execution
Creating a New Method
Step 1: Create Method Class
Create a new file in modules/workflows/methods/ (e.g., methodNewCapability.py):
- Inherit from
MethodBase - Initialize with
servicesobject in__init__ - Define class-level
nameanddescriptionattributes
Step 2: Define Actions
Create async methods decorated with @action:
- First parameter must be
parameters: Dict[str, Any] - Return
ActionResultwithsuccess,documents, and optionalerror - Include comprehensive docstring with parameter descriptions
- Handle exceptions and return ActionResult with
success=False
Step 3: Action Requirements
- Must be async (workflow execution is asynchronous)
- Must accept
parameters: Dict[str, Any]as first argument - Must return
ActionResultwith success, documents, and error fields - Should NOT set
resultLabelin ActionResult (managed by action handler) - Should include comprehensive docstring with parameter descriptions
- Should handle exceptions and return ActionResult with
success=False
Step 4: Automatic Discovery
Methods are automatically discovered when discoverMethods(services) is called:
- Scans
modules/workflows/methods/directory - Identifies classes inheriting from
MethodBase - Inspects classes for
@actiondecorated methods - Extracts action signatures and metadata
- Stores in global methods catalog
Step 5: Services Access
Methods have access to self.services providing:
services.workflow: Current workflow objectservices.chat: Chat service for workflow operationsservices.ai: AI service for AI operationsservices.extraction: Extraction service for document processingservices.generation: Generation service for document creation- All other services and interfaces
Action Signature Generation
Action signatures are automatically generated for AI prompt generation:
Process:
- Inspect action method signature using
inspect.signature() - Extract parameter names, types, defaults, and descriptions
- Parse docstring for parameter documentation
- Generate formatted signature string
Usage: Signatures are included in action planning prompts so AI models understand available actions and their parameters.
Document References in Actions
Actions receive document references in parameters:
Resolving References: Use services.chat.getChatDocumentsFromDocumentList(documentList) to resolve references to actual ChatDocument objects.
Reference Formats:
docItem:<id>:<filename>: Single document by IDdocList:<label>: All documents with label (newest)docList:<messageId>:<label>: Documents from specific message
Usage: Actions typically receive documentList parameter containing array of references, resolve them, process content, and return results as ActionDocument objects.
Returning Action Results
Actions return ActionResult objects:
Structure:
success: Boolean indicating execution successdocuments: List of ActionDocument objects with resultserror: Optional error message string (if success=False)
ActionDocument Structure:
data: String content or binary datamimeType: MIME type of the contentfileName: Suggested filenamemetadata: Optional dictionary with additional metadata
Result Conversion: ActionDocuments are automatically converted to ChatDocument objects by ActionExecutor and stored in database.
Result Labels: Actions should NOT set resultLabel in ActionResult. The label is managed by the action handler using the action's execResultLabel from the action plan.
Method Discovery and Services Update
Critical Mechanism: When discoverMethods(services) is called with a new services object:
- Checks if method already exists in catalog (from previous discovery)
- If exists, updates
instance.servicesreference to new services object - Ensures cached method instances use current workflow, not stale workflow from previous request
- Prevents workflow ID mismatches and cross-workflow contamination
When Called: discoverMethods() is called at workflow start in WorkflowManager.workflowStart() to ensure methods use correct workflow context.
Workflow State Management
Workflows maintain state in two places: in-memory workflow object and database persistence. State synchronization ensures consistency between both representations.
Workflow State Fields
Progress Tracking:
currentRound: Which user interaction round (incremented each time user sends new input)currentTask: Which task in current task plan (incremented as tasks execute)currentAction: Which action within current task (incremented as actions execute)totalTasks: Number of tasks in current task plan (set after planning)totalActions: Number of actions in current task (set after action planning)
Status Fields:
status: Workflow status ("running", "completed", "stopped", "failed")workflowMode: Execution mode (WORKFLOW_ACTIONPLAN, WORKFLOW_DYNAMIC, WORKFLOW_AUTOMATION)maxSteps: Maximum steps for Dynamic mode (default: 5)
Timestamps:
startedAt: When workflow was createdlastActivity: Last activity timestamp (updated during execution)
Related Data:
messages: List of ChatMessage objects (loaded from database)logs: List of ChatLog objects (execution logs)stats: List of ChatStat objects (performance statistics)
State Synchronization
Update Pattern: All workflow state updates follow two-step pattern:
- Update in-memory workflow object for immediate access
- Persist change to database via
services.chat.updateWorkflow()
Consistency: This ensures consistency between memory and persistence while maintaining fast access during execution.
State Update Methods: WorkflowProcessor provides methods for updating state at key points:
updateWorkflowAfterTaskPlanCreated(): Sets totalTasks after planningupdateWorkflowBeforeExecutingTask(): Updates currentTask before executionupdateWorkflowAfterActionPlanning(): Sets totalActions after action planningupdateWorkflowBeforeExecutingAction(): Updates currentAction before action execution
Workflow Stopping
Stop Mechanism: checkWorkflowStopped() utility checks if workflow status is "stopped" and raises WorkflowStoppedException if so.
Strategic Placement: Called at all major workflow checkpoints:
- Before task planning
- Before task execution
- Before action execution
- Before AI calls
- Before result processing
Graceful Shutdown: When exception is raised and caught, workflow performs cleanup, persists current state, creates stop message, and updates status consistently.
Stop Endpoint: WorkflowManager.workflowStop() updates workflow status to "stopped" in both memory and database, records stop event in logs.
Document State
Documents are tracked throughout workflow execution:
Document References: Stable identifiers for documents:
docItem:<id>:<filename>: Individual documentdocList:<label>: Group of documentsdocList:<messageId>:<label>: Specific message's documents
Document Labels: Context-aware naming:
round{n}_usercontext: User-provided contextround{n}_task{t}_action{a}_{purpose}: Action resultstaskplan: Task plan message
Message Association: Documents linked to messages via messageId field, enabling document resolution and routing.
Workflow Engineering Best Practices
1. Choose the Right Mode
Actionplan Mode: Use for complex workflows with specific requirements, when upfront planning leads to better execution, when quality control is critical.
Dynamic Mode: Use for exploratory workflows, when path forward depends on intermediate results, when iteration is needed, for research and analysis tasks.
Automation Mode: Use for repeated workflows, batch processing, scheduled jobs, when exact execution sequence must be guaranteed, for routine operations.
2. Design Task Plans Carefully
Clear Objectives: Each task should have a clear, measurable objective.
Success Criteria: Define specific, testable success criteria for each task.
Dependencies: Use task dependencies to ensure correct execution order.
Atomic Tasks: Keep tasks focused on single, well-defined goals.
Reasonable Scope: Don't make tasks too large or too small.
3. Use Document References Effectively
Label Documents: Use meaningful labels that encode workflow context (round/task/action).
Reference Resolution: Always resolve document references before processing.
Document Lineage: Track document lineage through workflow for debugging and auditing.
Result Routing: Use execResultLabel to route action results to subsequent actions.
4. Handle Errors Gracefully
Action-Level Errors: Actions should catch exceptions and return ActionResult with success=False.
Task-Level Errors: Tasks should handle action failures and continue or retry as appropriate.
Workflow-Level Errors: Workflows should handle task failures and provide meaningful error messages.
Error Logging: Log errors with full context for debugging.
5. Optimize for Performance
Parallel Processing: Use parallel processing where possible (e.g., processing multiple documents simultaneously).
Chunking: Chunk large documents for model-aware processing.
Caching: Cache expensive operations where appropriate.
Progress Tracking: Use progress logging for long-running operations.
6. Test Workflows Thoroughly
Unit Testing: Test individual actions and methods in isolation.
Integration Testing: Test complete workflows with realistic data.
Error Scenarios: Test error handling and edge cases.
Performance Testing: Test workflows under load.
7. Monitor and Debug
Logging: Use structured logging with context information.
Progress Tracking: Use progress logging for visibility into long-running workflows.
Statistics: Track workflow statistics (costs, processing time, error rates).
Debugging: Use workflow state and logs to debug issues.
8. Document Workflows
Purpose: Document what each workflow does and why.
Inputs: Document required inputs and parameters.
Outputs: Document expected outputs and formats.
Dependencies: Document dependencies on other workflows or services.
Examples: Provide examples of workflow usage.
Workflow Engineering Examples
Example 1: Simple Document Analysis Workflow (Actionplan Mode)
Use Case: Analyze uploaded documents and generate summary report
Workflow Steps:
- User uploads documents via API
- Workflow starts with WORKFLOW_ACTIONPLAN mode
- AI generates task plan: "Extract content", "Analyze content", "Generate report"
- Task 1: Extract content from documents using
extraction.extractContent() - Task 2: Analyze content using
ai.callAiDocuments()with analysis prompt - Task 3: Generate report using
generation.renderReport()to PDF format - Store results as ChatDocument and return to user
Key Actions:
ai.process(): Process documents with AIgeneration.renderReport(): Generate formatted report
Example 2: Research Workflow (Dynamic Mode)
Use Case: Research a topic iteratively, gathering information until sufficient
Workflow Steps:
- User provides research question
- Workflow starts with WORKFLOW_DYNAMIC mode (maxSteps=10)
- AI generates task plan: "Research topic"
- Dynamic execution:
- Action 1: Search web using
ai.process()with search prompt - Evaluate: Is information sufficient? No → continue
- Action 2: Analyze search results using
ai.process()with analysis prompt - Evaluate: Is information sufficient? No → continue
- Action 3: Search for additional sources
- Evaluate: Is information sufficient? Yes → complete
- Action 1: Search web using
- Generate summary and return results
Key Actions:
ai.process(): Iterative AI processing with different prompts
Example 3: Daily Report Automation (Automation Mode)
Use Case: Generate daily report from SharePoint documents automatically
Automation Definition:
- Schedule: Daily at 9:00 AM
- Template: "Generate daily report from SharePoint documents"
- Placeholders:
{{REPORT_DATE}},{{SHAREPOINT_FOLDER}}
Workflow Plan:
- Task 1: Search SharePoint for documents from
{{REPORT_DATE}}- Action:
sharepoint.search()with date filter
- Action:
- Task 2: Download documents from SharePoint
- Action:
sharepoint.download()for each document found
- Action:
- Task 3: Extract content from documents
- Action:
extraction.extractContent()for all documents
- Action:
- Task 4: Generate report
- Action:
ai.process()with report generation prompt - Action:
generation.renderReport()to PDF format
- Action:
- Task 5: Upload report to SharePoint
- Action:
sharepoint.upload()with report document
- Action:
Key Actions:
sharepoint.search(): Find documentssharepoint.download(): Download documentsextraction.extractContent(): Extract contentai.process(): Generate report contentgeneration.renderReport(): Create PDF reportsharepoint.upload(): Upload final report
Related Documentation
- Architecture Overview - High-level system architecture
- Connectors Component - Detailed connector documentation
- Datamodels & Interfaces Component - Interface layer details
- Services Component - Service layer documentation
- Services API Reference - Complete service API reference
- Workflows Component - Workflow orchestration details
- Security Component - Security and authentication
Document Version: 1.0
Last Updated: 2025-01-25
Status: Complete