gateway/modules/interfaces/interfaceDbChatObjects.py
2025-11-04 21:14:03 +01:00

1683 lines
No EOL
73 KiB
Python

"""
Interface to LucyDOM database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
import logging
import uuid
import math
from typing import Dict, Any, List, Optional, Union
import asyncio
from modules.interfaces.interfaceDbChatAccess import ChatAccess
from modules.datamodels.datamodelChat import (
ChatDocument,
ChatStat,
ChatLog,
ChatMessage,
ChatWorkflow,
WorkflowModeEnum,
AutomationDefinition,
UserInputRequest
)
import json
from modules.datamodels.datamodelUam import User
# DYNAMIC PART: Connectors to the Interface
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.timezoneUtils import getUtcTimestamp
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResult
# Basic Configurations
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Singleton factory for Chat instances
_chatInterfaces = {}
class ChatObjects:
"""
Interface to Chat database and AI Connectors.
Uses the JSON connector for data access with added language support.
"""
def __init__(self, currentUser: Optional[User] = None):
"""Initializes the Chat Interface."""
# Initialize variables
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id if currentUser else None
self.mandateId = currentUser.mandateId if currentUser else None
self.access = None # Will be set when user context is provided
# Initialize services
self._initializeServices()
# Initialize database
self._initializeDatabase()
# Set user context if provided
if currentUser:
self.setUserContext(currentUser)
# ===== Generic Utility Methods =====
def _isObjectField(self, fieldType) -> bool:
"""Check if a field type represents a complex object (not a simple type)."""
# Simple scalar types
if fieldType in (str, int, float, bool, type(None)):
return False
# Everything else is an object
return True
def _separateObjectFields(self, model_class, data: Dict[str, Any]) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""Separate simple fields from object fields based on Pydantic model structure."""
simpleFields = {}
objectFields = {}
# Get field information from the Pydantic model
modelFields = model_class.model_fields
for fieldName, value in data.items():
# Check if this field should be stored as JSONB in the database
if fieldName in modelFields:
fieldInfo = modelFields[fieldName]
# Pydantic v2 only
fieldType = fieldInfo.annotation
# Always route relational/object fields to object_fields for separate handling
if fieldName in ['documents', 'stats']:
objectFields[fieldName] = value
continue
# Check if this is a JSONB field (Dict, List, or complex types)
if (fieldType == dict or
fieldType == list or
(hasattr(fieldType, '__origin__') and fieldType.__origin__ in (dict, list)) or
fieldName in ['execParameters', 'expectedDocumentFormats', 'resultDocuments']):
# Store as JSONB - include in simple_fields for database storage
simpleFields[fieldName] = value
elif isinstance(value, (str, int, float, bool, type(None))):
# Simple scalar types
simpleFields[fieldName] = value
else:
# Complex objects that should be filtered out
objectFields[fieldName] = value
else:
# Field not in model - treat as scalar if simple, otherwise filter out
# BUT: always include metadata fields (_createdBy, _createdAt, etc.) as they're handled by connector
if fieldName.startswith("_"):
# Metadata fields should be passed through to connector
simpleFields[fieldName] = value
elif isinstance(value, (str, int, float, bool, type(None))):
simpleFields[fieldName] = value
else:
objectFields[fieldName] = value
return simpleFields, objectFields
def _initializeServices(self):
pass
def setUserContext(self, currentUser: User):
"""Sets the user context for the interface."""
self.currentUser = currentUser # Store User object directly
self.userId = currentUser.id
self.mandateId = currentUser.mandateId
if not self.userId or not self.mandateId:
raise ValueError("Invalid user context: id and mandateId are required")
# Add language settings
self.userLanguage = currentUser.language # Default user language
# Initialize access control with user context
self.access = ChatAccess(self.currentUser, self.db) # Convert to dict only when needed
# Update database context
self.db.updateContext(self.userId)
def __del__(self):
"""Cleanup method to close database connection."""
if hasattr(self, 'db') and self.db is not None:
try:
self.db.close()
except Exception as e:
logger.error(f"Error closing database connection: {e}")
def _initializeDatabase(self):
"""Initializes the database connection directly."""
try:
# Get configuration values with defaults
dbHost = APP_CONFIG.get("DB_CHAT_HOST", "_no_config_default_data")
dbDatabase = APP_CONFIG.get("DB_CHAT_DATABASE", "chat")
dbUser = APP_CONFIG.get("DB_CHAT_USER")
dbPassword = APP_CONFIG.get("DB_CHAT_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_CHAT_PORT", 5432))
# Create database connector directly
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId
)
# Initialize database system
self.db.initDbSystem()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {str(e)}")
raise
def _initRecords(self):
"""Initializes standard records in the database if they don't exist."""
pass
def _uam(self, model_class: type, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Delegate to access control module."""
# First apply access control
filteredRecords = self.access.uam(model_class, recordset)
# Then filter out database-specific fields
cleanedRecords = []
for record in filteredRecords:
# Create a new dict with only non-database fields
cleanedRecord = {k: v for k, v in record.items() if not k.startswith('_')}
cleanedRecords.append(cleanedRecord)
return cleanedRecords
def _canModify(self, model_class: type, recordId: Optional[str] = None) -> bool:
"""Delegate to access control module."""
return self.access.canModify(model_class, recordId)
def _applyFilters(self, records: List[Dict[str, Any]], filters: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Apply filter criteria to records (implementation for future filtering)."""
# TODO: Implement filtering logic when needed
return records
def _applySorting(self, records: List[Dict[str, Any]], sortFields: List[Any]) -> List[Dict[str, Any]]:
"""Apply multi-level sorting to records using stable sort (sorts from least to most significant field)."""
if not sortFields:
return records
# Start with a copy to avoid modifying original
sortedRecords = list(records)
# Sort from least significant to most significant field (reverse order)
# Python's sort is stable, so this creates proper multi-level sorting
for sortField in reversed(sortFields):
# Handle both dict and object formats
if isinstance(sortField, dict):
fieldName = sortField.get("field")
direction = sortField.get("direction", "asc")
else:
fieldName = getattr(sortField, "field", None)
direction = getattr(sortField, "direction", "asc")
if not fieldName:
continue
isDesc = (direction == "desc")
def sortKey(record):
value = record.get(fieldName)
# Handle None values - place them at the end for both directions
if value is None:
# Use a special value that sorts last
return (1, "") # (is_none_flag, empty_value) - sorts after (0, ...)
else:
# Return tuple with type indicator for proper comparison
if isinstance(value, (int, float)):
return (0, value)
elif isinstance(value, str):
return (0, value)
elif isinstance(value, bool):
return (0, value)
else:
return (0, str(value))
# Sort with reverse parameter for descending
sortedRecords.sort(key=sortKey, reverse=isDesc)
return sortedRecords
# Utilities
def getInitialId(self, model_class: type) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(model_class)
# Workflow methods
def getWorkflows(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns workflows based on user access level.
Supports optional pagination, sorting, and filtering.
Args:
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[Dict[str, Any]]
If pagination is provided: PaginatedResult with items and metadata
"""
allWorkflows = self.db.getRecordset(ChatWorkflow)
filteredWorkflows = self._uam(ChatWorkflow, allWorkflows)
# If no pagination requested, return all items
if pagination is None:
return filteredWorkflows
# Apply filtering (if filters provided)
if pagination.filters:
filteredWorkflows = self._applyFilters(filteredWorkflows, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredWorkflows = self._applySorting(filteredWorkflows, pagination.sort)
# Count total items after filters
totalItems = len(filteredWorkflows)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedWorkflows = filteredWorkflows[startIdx:endIdx]
return PaginatedResult(
items=pagedWorkflows,
totalItems=totalItems,
totalPages=totalPages
)
def getWorkflow(self, workflowId: str) -> Optional[ChatWorkflow]:
"""Returns a workflow by ID if user has access."""
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return None
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return None
workflow = filteredWorkflows[0]
try:
# Load related data from normalized tables
logs = self.getLogs(workflowId)
messages = self.getMessages(workflowId)
stats = self.getStats(workflowId)
# Validate workflow data against ChatWorkflow model
return ChatWorkflow(
id=workflow["id"],
status=workflow.get("status", "running"),
name=workflow.get("name"),
currentRound=workflow.get("currentRound", 0),
currentTask=workflow.get("currentTask", 0),
currentAction=workflow.get("currentAction", 0),
totalTasks=workflow.get("totalTasks", 0),
totalActions=workflow.get("totalActions", 0),
lastActivity=workflow.get("lastActivity", getUtcTimestamp()),
startedAt=workflow.get("startedAt", getUtcTimestamp()),
logs=logs,
messages=messages,
stats=stats,
mandateId=workflow.get("mandateId", self.currentUser.mandateId)
)
except Exception as e:
logger.error(f"Error validating workflow data: {str(e)}")
return None
def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow:
"""Creates a new workflow if user has permission."""
if not self._canModify(ChatWorkflow):
raise PermissionError("No permission to create workflows")
# Set timestamp if not present
currentTime = getUtcTimestamp()
if "startedAt" not in workflowData:
workflowData["startedAt"] = currentTime
if "lastActivity" not in workflowData:
workflowData["lastActivity"] = currentTime
# Use generic field separation based on ChatWorkflow model
simpleFields, objectFields = self._separateObjectFields(ChatWorkflow, workflowData)
# Create workflow in database
created = self.db.recordCreate(ChatWorkflow, simpleFields)
# Convert to ChatWorkflow model (empty related data for new workflow)
return ChatWorkflow(
id=created["id"],
status=created.get("status", "running"),
name=created.get("name"),
currentRound=created.get("currentRound", 0),
currentTask=created.get("currentTask", 0),
currentAction=created.get("currentAction", 0),
totalTasks=created.get("totalTasks", 0),
totalActions=created.get("totalActions", 0),
lastActivity=created.get("lastActivity", currentTime),
startedAt=created.get("startedAt", currentTime),
logs=[],
messages=[],
stats=[],
mandateId=created.get("mandateId", self.currentUser.mandateId),
workflowMode=created["workflowMode"],
maxSteps=created.get("maxSteps", 1)
)
def updateWorkflow(self, workflowId: str, workflowData: Dict[str, Any]) -> ChatWorkflow:
"""Updates a workflow if user has access."""
# Check if the workflow exists and user has access
workflow = self.getWorkflow(workflowId)
if not workflow:
return None
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to update workflow {workflowId}")
# Use generic field separation based on ChatWorkflow model
simpleFields, objectFields = self._separateObjectFields(ChatWorkflow, workflowData)
# Set update time for main workflow
simpleFields["lastActivity"] = getUtcTimestamp()
# Update main workflow in database
updated = self.db.recordModify(ChatWorkflow, workflowId, simpleFields)
# Removed cascade writes for logs/messages/stats during workflow update.
# CUD for child entities must be executed via dedicated service methods.
# Load fresh data from normalized tables
logs = self.getLogs(workflowId)
messages = self.getMessages(workflowId)
stats = self.getStats(workflowId)
# Convert to ChatWorkflow model
return ChatWorkflow(
id=updated["id"],
status=updated.get("status", workflow.status),
name=updated.get("name", workflow.name),
currentRound=updated.get("currentRound", workflow.currentRound),
currentTask=updated.get("currentTask", workflow.currentTask),
currentAction=updated.get("currentAction", workflow.currentAction),
totalTasks=updated.get("totalTasks", workflow.totalTasks),
totalActions=updated.get("totalActions", workflow.totalActions),
lastActivity=updated.get("lastActivity", workflow.lastActivity),
startedAt=updated.get("startedAt", workflow.startedAt),
logs=logs,
messages=messages,
stats=stats,
mandateId=updated.get("mandateId", workflow.mandateId)
)
def deleteWorkflow(self, workflowId: str) -> bool:
"""Deletes a workflow and all related data if user has access."""
try:
# Check if the workflow exists and user has access
workflow = self.getWorkflow(workflowId)
if not workflow:
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to delete workflow {workflowId}")
# CASCADE DELETE: Delete all related data first
# 1. Delete all workflow messages and their related data
messages = self.getMessages(workflowId)
for message in messages:
messageId = message.id
if messageId:
# Delete message stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# Delete message documents (but NOT the files!)
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
# Delete the message itself
self.db.recordDelete(ChatMessage, messageId)
# 2. Delete workflow stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 3. Delete workflow logs
existing_logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
for log in existing_logs:
self.db.recordDelete(ChatLog, log["id"])
# 4. Finally delete the workflow itself
success = self.db.recordDelete(ChatWorkflow, workflowId)
return success
except Exception as e:
logger.error(f"Error deleting workflow {workflowId}: {str(e)}")
return False
# Message methods
def getMessages(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatMessage], PaginatedResult]:
"""
Returns messages for a workflow if user has access to the workflow.
Supports optional pagination, sorting, and filtering.
Args:
workflowId: The workflow ID to get messages for
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[ChatMessage]
If pagination is provided: PaginatedResult with items and metadata
"""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get messages for this workflow from normalized table
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
# Convert raw messages to dict format for sorting/filtering
messageDicts = []
for msg in messages:
messageDicts.append({
"id": msg.get("id"),
"workflowId": msg.get("workflowId"),
"parentMessageId": msg.get("parentMessageId"),
"documentsLabel": msg.get("documentsLabel"),
"message": msg.get("message"),
"role": msg.get("role", "assistant"),
"status": msg.get("status", "step"),
"sequenceNr": msg.get("sequenceNr", 0),
"publishedAt": msg.get("publishedAt", msg.get("timestamp", getUtcTimestamp())),
"success": msg.get("success"),
"actionId": msg.get("actionId"),
"actionMethod": msg.get("actionMethod"),
"actionName": msg.get("actionName"),
"roundNumber": msg.get("roundNumber"),
"taskNumber": msg.get("taskNumber"),
"actionNumber": msg.get("actionNumber"),
"taskProgress": msg.get("taskProgress"),
"actionProgress": msg.get("actionProgress")
})
# Apply default sorting by publishedAt if no sort specified
if pagination is None or not pagination.sort:
messageDicts.sort(key=lambda x: x.get("publishedAt", getUtcTimestamp()))
# Apply filtering (if filters provided)
if pagination and pagination.filters:
messageDicts = self._applyFilters(messageDicts, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination and pagination.sort:
messageDicts = self._applySorting(messageDicts, pagination.sort)
# If no pagination requested, return all items
if pagination is None:
# Convert messages to ChatMessage objects and load documents
chat_messages = []
for msg in messageDicts:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", getUtcTimestamp()),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
chat_messages.append(chat_message)
return chat_messages
# Count total items after filters
totalItems = len(messageDicts)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedMessageDicts = messageDicts[startIdx:endIdx]
# Convert messages to ChatMessage objects and load documents
chat_messages = []
for msg in pagedMessageDicts:
# Load documents from normalized documents table
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", getUtcTimestamp()),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
chat_messages.append(chat_message)
return PaginatedResult(
items=chat_messages,
totalItems=totalItems,
totalPages=totalPages
)
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
"""Creates a message for a workflow if user has access."""
try:
# Ensure ID is present
if "id" not in messageData or not messageData["id"]:
messageData["id"] = f"msg_{uuid.uuid4()}"
# Check required fields
requiredFields = ["id", "workflowId"]
for field in requiredFields:
if field not in messageData:
logger.error(f"Required field '{field}' missing in messageData")
raise ValueError(f"Required field '{field}' missing in message data")
# Check workflow access
workflowId = messageData["workflowId"]
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Validate that ID is not None
if messageData["id"] is None:
messageData["id"] = f"msg_{uuid.uuid4()}"
logger.warning(f"Automatically generated ID for workflow message: {messageData['id']}")
# Set status if not present
if "status" not in messageData:
messageData["status"] = "step" # Default status for intermediate messages
# Ensure role and agentName are present
if "role" not in messageData:
messageData["role"] = "assistant" if messageData.get("agentName") else "user"
if "agentName" not in messageData:
messageData["agentName"] = ""
# CRITICAL FIX: Automatically set roundNumber, taskNumber, and actionNumber if not provided
# This ensures messages have the correct progress context when workflows are continued
if "roundNumber" not in messageData:
messageData["roundNumber"] = workflow.currentRound
if "taskNumber" not in messageData:
messageData["taskNumber"] = workflow.currentTask
if "actionNumber" not in messageData:
messageData["actionNumber"] = workflow.currentAction
# Use generic field separation based on ChatMessage model
simpleFields, objectFields = self._separateObjectFields(ChatMessage, messageData)
# Handle documents separately - they will be stored in normalized documents table
documents_to_create = objectFields.get("documents", [])
# Create message in normalized table using only simple fields
createdMessage = self.db.recordCreate(ChatMessage, simpleFields)
# Create documents in normalized documents table
created_documents = []
for doc_data in documents_to_create:
# Normalize to plain dict before assignment
if isinstance(doc_data, ChatDocument):
doc_dict = doc_data.model_dump()
elif isinstance(doc_data, dict):
doc_dict = dict(doc_data)
else:
# Attempt to coerce to ChatDocument then dump
try:
doc_dict = ChatDocument(**doc_data).model_dump()
except Exception:
logger.error("Invalid document data type for message creation")
continue
doc_dict["messageId"] = createdMessage["id"]
created_doc = self.createDocument(doc_dict)
if created_doc:
created_documents.append(created_doc)
# Convert to ChatMessage model
chat_message = ChatMessage(
id=createdMessage["id"],
workflowId=createdMessage["workflowId"],
parentMessageId=createdMessage.get("parentMessageId"),
agentName=createdMessage.get("agentName"),
documents=created_documents,
documentsLabel=createdMessage.get("documentsLabel"),
message=createdMessage.get("message"),
role=createdMessage.get("role", "assistant"),
status=createdMessage.get("status", "step"),
sequenceNr=len(workflow.messages) + 1, # Use messages list length for sequence number
publishedAt=createdMessage.get("publishedAt", getUtcTimestamp()),
stats=objectFields.get("stats"), # Use stats from objectFields
roundNumber=createdMessage.get("roundNumber"),
taskNumber=createdMessage.get("taskNumber"),
actionNumber=createdMessage.get("actionNumber"),
success=createdMessage.get("success"),
actionId=createdMessage.get("actionId"),
actionMethod=createdMessage.get("actionMethod"),
actionName=createdMessage.get("actionName")
)
# Debug: Store message and documents for debugging - only if debug enabled
from modules.shared.debugLogger import storeDebugMessageAndDocuments
storeDebugMessageAndDocuments(chat_message, self.currentUser)
return chat_message
except Exception as e:
logger.error(f"Error creating workflow message: {str(e)}")
return None
def updateMessage(self, messageId: str, messageData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates a workflow message if user has access to the workflow."""
try:
# Ensure messageId is provided
if not messageId:
logger.error("No messageId provided for updateMessage")
raise ValueError("messageId cannot be empty")
# Check if message exists in database
messages = self.db.getRecordset(ChatMessage, recordFilter={"id": messageId})
if not messages:
logger.warning(f"Message with ID {messageId} does not exist in database")
# If message doesn't exist but we have workflowId, create it
if "workflowId" in messageData:
workflowId = messageData.get("workflowId")
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
logger.info(f"Creating new message with ID {messageId} for workflow {workflowId}")
return self.db.recordCreate(ChatMessage, messageData)
else:
logger.error(f"Workflow ID missing for new message {messageId}")
return None
# Update existing message
existingMessage = messages[0]
# Check workflow access
workflowId = existingMessage.get("workflowId")
workflow = self.getWorkflow(workflowId)
if not workflow:
raise PermissionError(f"No access to workflow {workflowId}")
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Use generic field separation based on ChatMessage model
simpleFields, objectFields = self._separateObjectFields(ChatMessage, messageData)
# Ensure required fields present
for key in ["role", "agentName"]:
if key not in simpleFields and key not in existingMessage:
simpleFields[key] = "assistant" if key == "role" else ""
# Ensure ID is in the dataset
if 'id' not in simpleFields:
simpleFields['id'] = messageId
# Convert createdAt to startedAt if needed
if "createdAt" in simpleFields and "startedAt" not in simpleFields:
simpleFields["startedAt"] = simpleFields["createdAt"]
del simpleFields["createdAt"]
# Update the message with simple fields only
updatedMessage = self.db.recordModify(ChatMessage, messageId, simpleFields)
# Handle object field updates (documents, stats) inline
if 'documents' in objectFields:
documents_data = objectFields['documents']
try:
for doc_data in documents_data:
# Normalize to dict before mutation
if isinstance(doc_data, ChatDocument):
doc_dict = doc_data.model_dump()
elif isinstance(doc_data, dict):
doc_dict = dict(doc_data)
else:
try:
doc_dict = ChatDocument(**doc_data).model_dump()
except Exception:
logger.error("Invalid document data type for message update")
continue
doc_dict["messageId"] = messageId
self.createDocument(doc_dict)
except Exception as e:
logger.error(f"Error updating message documents: {str(e)}")
if not updatedMessage:
logger.warning(f"Failed to update message {messageId}")
return updatedMessage
except Exception as e:
logger.error(f"Error updating message {messageId}: {str(e)}", exc_info=True)
raise ValueError(f"Error updating message {messageId}: {str(e)}")
def deleteMessage(self, workflowId: str, messageId: str) -> bool:
"""Deletes a workflow message and all related data if user has access to the workflow."""
try:
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Check if the message exists
messages = self.getMessages(workflowId)
message = next((m for m in messages if m.get("id") == messageId), None)
if not message:
logger.warning(f"Message {messageId} for workflow {workflowId} not found")
return False
# CASCADE DELETE: Delete all related data first
# 1. Delete message stats
existing_stats = self.db.getRecordset(ChatStat, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# 2. Delete message documents (but NOT the files!)
existing_docs = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
# 3. Finally delete the message itself
success = self.db.recordDelete(ChatMessage, messageId)
return success
except Exception as e:
logger.error(f"Error deleting message {messageId}: {str(e)}")
return False
def deleteFileFromMessage(self, workflowId: str, messageId: str, fileId: str) -> bool:
"""Removes a file reference from a message if user has access."""
try:
# Check workflow access
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return False
if not self._canModify(ChatWorkflow, workflowId):
raise PermissionError(f"No permission to modify workflow {workflowId}")
# Get documents for this message from normalized table
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
if not documents:
logger.warning(f"No documents found for message {messageId}")
return False
# Find and delete the specific document
removed = False
for doc in documents:
docId = doc.get("id")
fileIdValue = doc.get("fileId")
# Flexible matching approach
shouldRemove = (
(docId == fileId) or
(fileIdValue == fileId) or
(isinstance(docId, str) and str(fileId) in docId) or
(isinstance(fileIdValue, str) and str(fileId) in fileIdValue)
)
if shouldRemove:
# Delete the document from normalized table
success = self.db.recordDelete(ChatDocument, docId)
if success:
removed = True
else:
logger.warning(f"Failed to delete document {docId}")
if not removed:
logger.warning(f"No matching file {fileId} found in message {messageId}")
return False
return True
except Exception as e:
logger.error(f"Error removing file {fileId} from message {messageId}: {str(e)}")
return False
# Document methods
def getDocuments(self, messageId: str) -> List[ChatDocument]:
"""Returns documents for a message from normalized table."""
try:
documents = self.db.getRecordset(ChatDocument, recordFilter={"messageId": messageId})
return [ChatDocument(**doc) for doc in documents]
except Exception as e:
logger.error(f"Error getting message documents: {str(e)}")
return []
def createDocument(self, documentData: Dict[str, Any]) -> ChatDocument:
"""Creates a document for a message in normalized table."""
try:
# Validate and normalize document data to dict
document = ChatDocument(**documentData)
created = self.db.recordCreate(ChatDocument, document.model_dump())
return ChatDocument(**created)
except Exception as e:
logger.error(f"Error creating message document: {str(e)}")
return None
# Log methods
def getLogs(self, workflowId: str, pagination: Optional[PaginationParams] = None) -> Union[List[ChatLog], PaginatedResult]:
"""
Returns logs for a workflow if user has access to the workflow.
Supports optional pagination, sorting, and filtering.
Args:
workflowId: The workflow ID to get logs for
pagination: Optional pagination parameters. If None, returns all items.
Returns:
If pagination is None: List[ChatLog]
If pagination is provided: PaginatedResult with items and metadata
"""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
if pagination is None:
return []
return PaginatedResult(items=[], totalItems=0, totalPages=0)
# Get logs for this workflow from normalized table
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
# Convert raw logs to dict format for sorting/filtering
logDicts = []
for log in logs:
logDicts.append({
"id": log.get("id"),
"workflowId": log.get("workflowId"),
"message": log.get("message"),
"type": log.get("type"),
"timestamp": log.get("timestamp", getUtcTimestamp()),
"agentName": log.get("agentName"),
"status": log.get("status"),
"progress": log.get("progress"),
"mandateId": log.get("mandateId"),
"userId": log.get("userId")
})
# Apply default sorting by timestamp if no sort specified
if pagination is None or not pagination.sort:
logDicts.sort(key=lambda x: float(x.get("timestamp", 0)))
# Apply filtering (if filters provided)
if pagination and pagination.filters:
logDicts = self._applyFilters(logDicts, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination and pagination.sort:
logDicts = self._applySorting(logDicts, pagination.sort)
# If no pagination requested, return all items
if pagination is None:
return [ChatLog(**log) for log in logDicts]
# Count total items after filters
totalItems = len(logDicts)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedLogDicts = logDicts[startIdx:endIdx]
# Convert to model objects
items = [ChatLog(**log) for log in pagedLogDicts]
return PaginatedResult(
items=items,
totalItems=totalItems,
totalPages=totalPages
)
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
"""Creates a log entry for a workflow if user has access."""
# Check workflow access
workflowId = logData.get("workflowId")
if not workflowId:
logger.error("No workflowId provided for createLog")
return None
workflow = self.getWorkflow(workflowId)
if not workflow:
logger.warning(f"No access to workflow {workflowId}")
return None
if not self._canModify(ChatWorkflow, workflowId):
logger.warning(f"No permission to modify workflow {workflowId}")
return None
# Make sure required fields are present
if "timestamp" not in logData:
logData["timestamp"] = getUtcTimestamp()
# Add status information if not present
if "status" not in logData and "type" in logData:
if logData["type"] == "error":
logData["status"] = "error"
else:
logData["status"] = "running"
# Add progress information if not present
if "progress" not in logData:
# Default progress values based on log type (0.0 to 1.0 format)
if logData.get("type") == "info":
logData["progress"] = 0.5 # Default middle progress
elif logData.get("type") == "error":
logData["progress"] = 1.0 # Error state - completed (failed)
elif logData.get("type") == "warning":
logData["progress"] = 0.5 # Default middle progress
# Validate log data against ChatLog model
try:
log_model = ChatLog(**logData)
except Exception as e:
logger.error(f"Invalid log data: {str(e)}")
return None
# Create log in normalized table
createdLog = self.db.recordCreate(ChatLog, log_model)
# Return validated ChatLog instance
return ChatLog(**createdLog)
# Stats methods
def getStats(self, workflowId: str) -> List[ChatStat]:
"""Returns list of statistics for a workflow if user has access."""
# Check workflow access first (without calling getWorkflow to avoid circular reference)
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return []
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return []
# Get stats for this workflow from normalized table
stats = self.db.getRecordset(ChatStat, recordFilter={"workflowId": workflowId})
if not stats:
return []
# Return all stats records sorted by creation time
stats.sort(key=lambda x: x.get("created_at", ""))
return [ChatStat(**stat) for stat in stats]
def createStat(self, statData: Dict[str, Any]) -> ChatStat:
"""Creates a new stats record and returns it."""
try:
# Ensure workflowId is present in statData
if "workflowId" not in statData:
raise ValueError("workflowId is required in statData")
# Validate the stat data against ChatStat model
stat = ChatStat(**statData)
# Create the stat record in the database
created = self.db.recordCreate(ChatStat, stat)
# Return the created ChatStat
return ChatStat(**created)
except Exception as e:
logger.error(f"Error creating workflow stat: {str(e)}")
raise
def getUnifiedChatData(self, workflowId: str, afterTimestamp: Optional[float] = None) -> Dict[str, Any]:
"""
Returns unified chat data (messages, logs, stats) for a workflow in chronological order.
Uses timestamp-based selective data transfer for efficient polling.
"""
# Check workflow access first
workflows = self.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
return {"items": []}
filteredWorkflows = self._uam(ChatWorkflow, workflows)
if not filteredWorkflows:
return {"items": []}
# Get all data types and filter in Python (PostgreSQL connector doesn't support $gt operators)
items = []
# Get messages
messages = self.db.getRecordset(ChatMessage, recordFilter={"workflowId": workflowId})
for msg in messages:
# Apply timestamp filtering in Python
msg_timestamp = msg.get("publishedAt", getUtcTimestamp())
if afterTimestamp is not None and msg_timestamp <= afterTimestamp:
continue
# Load documents for each message
documents = self.getDocuments(msg["id"])
# Create ChatMessage object with loaded documents
chat_message = ChatMessage(
id=msg["id"],
workflowId=msg["workflowId"],
parentMessageId=msg.get("parentMessageId"),
documents=documents,
documentsLabel=msg.get("documentsLabel"),
message=msg.get("message"),
role=msg.get("role", "assistant"),
status=msg.get("status", "step"),
sequenceNr=msg.get("sequenceNr", 0),
publishedAt=msg.get("publishedAt", getUtcTimestamp()),
success=msg.get("success"),
actionId=msg.get("actionId"),
actionMethod=msg.get("actionMethod"),
actionName=msg.get("actionName"),
roundNumber=msg.get("roundNumber"),
taskNumber=msg.get("taskNumber"),
actionNumber=msg.get("actionNumber"),
taskProgress=msg.get("taskProgress"),
actionProgress=msg.get("actionProgress")
)
# Use publishedAt as the timestamp for chronological ordering
items.append({
"type": "message",
"createdAt": msg_timestamp,
"item": chat_message
})
# Get logs
logs = self.db.getRecordset(ChatLog, recordFilter={"workflowId": workflowId})
for log in logs:
# Apply timestamp filtering in Python
log_timestamp = log.get("timestamp", getUtcTimestamp())
if afterTimestamp is not None and log_timestamp <= afterTimestamp:
continue
chat_log = ChatLog(**log)
items.append({
"type": "log",
"createdAt": log_timestamp,
"item": chat_log
})
# Get stats list
stats = self.getStats(workflowId)
for stat in stats:
# Apply timestamp filtering in Python
stat_timestamp = stat.createdAt if hasattr(stat, 'createdAt') else getUtcTimestamp()
if afterTimestamp is not None and stat_timestamp <= afterTimestamp:
continue
items.append({
"type": "stat",
"createdAt": stat_timestamp,
"item": stat
})
# Sort all items by createdAt timestamp for chronological order
items.sort(key=lambda x: x["createdAt"])
return {"items": items}
# ===== Automation Methods =====
def _computeAutomationStatus(self, automation: Dict[str, Any]) -> str:
"""Compute status field based on eventId presence"""
eventId = automation.get("eventId")
return "Running" if eventId else "Idle"
def getAllAutomationDefinitions(self, pagination: Optional[PaginationParams] = None) -> Union[List[Dict[str, Any]], PaginatedResult]:
"""
Returns automation definitions based on user access level.
Supports optional pagination, sorting, and filtering.
Computes status field for each automation.
"""
allAutomations = self.db.getRecordset(AutomationDefinition)
filteredAutomations = self._uam(AutomationDefinition, allAutomations)
# Compute status for each automation and normalize executionLogs
for automation in filteredAutomations:
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
# If no pagination requested, return all items
if pagination is None:
return filteredAutomations
# Apply filtering (if filters provided)
if pagination.filters:
filteredAutomations = self._applyFilters(filteredAutomations, pagination.filters)
# Apply sorting (in order of sortFields)
if pagination.sort:
filteredAutomations = self._applySorting(filteredAutomations, pagination.sort)
# Count total items after filters
totalItems = len(filteredAutomations)
totalPages = math.ceil(totalItems / pagination.pageSize) if totalItems > 0 else 0
# Apply pagination (skip/limit)
startIdx = (pagination.page - 1) * pagination.pageSize
endIdx = startIdx + pagination.pageSize
pagedAutomations = filteredAutomations[startIdx:endIdx]
return PaginatedResult(
items=pagedAutomations,
totalItems=totalItems,
totalPages=totalPages
)
def getAutomationDefinition(self, automationId: str) -> Optional[Dict[str, Any]]:
"""Returns an automation definition by ID if user has access, with computed status."""
try:
automations = self.db.getRecordset(AutomationDefinition, recordFilter={"id": automationId})
filtered = self._uam(AutomationDefinition, automations)
if not filtered:
return None
automation = filtered[0]
automation["status"] = self._computeAutomationStatus(automation)
# Ensure executionLogs is always a list, not None
if automation.get("executionLogs") is None:
automation["executionLogs"] = []
return automation
except Exception as e:
logger.error(f"Error getting automation definition: {str(e)}")
return None
def createAutomationDefinition(self, automationData: Dict[str, Any]) -> Dict[str, Any]:
"""Creates a new automation definition, then triggers sync."""
try:
# Ensure ID is present
if "id" not in automationData or not automationData["id"]:
automationData["id"] = str(uuid.uuid4())
# Ensure mandateId is set
if "mandateId" not in automationData:
automationData["mandateId"] = self.mandateId
# Ensure database connector has correct userId context
# The connector should have been initialized with userId, but ensure it's updated
if self.userId and hasattr(self.db, 'updateContext'):
try:
self.db.updateContext(self.userId)
except Exception as e:
logger.warning(f"Could not update database context: {e}")
# Note: _createdBy will be set automatically by connector's _saveRecord method
# when _createdAt is not present. We don't need to set it manually here.
# Use generic field separation
simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData)
# Create automation in database
createdAutomation = self.db.recordCreate(AutomationDefinition, simpleFields)
# Compute status
createdAutomation["status"] = self._computeAutomationStatus(createdAutomation)
# Ensure executionLogs is always a list, not None
if createdAutomation.get("executionLogs") is None:
createdAutomation["executionLogs"] = []
# Trigger sync (async, don't wait)
asyncio.create_task(self.syncAutomationEvents())
return createdAutomation
except Exception as e:
logger.error(f"Error creating automation definition: {str(e)}")
raise
def updateAutomationDefinition(self, automationId: str, automationData: Dict[str, Any]) -> Dict[str, Any]:
"""Updates an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self._canModify(AutomationDefinition, automationId):
raise PermissionError(f"No permission to modify automation {automationId}")
# Use generic field separation
simpleFields, objectFields = self._separateObjectFields(AutomationDefinition, automationData)
# Update automation in database
updatedAutomation = self.db.recordModify(AutomationDefinition, automationId, simpleFields)
# Compute status
updatedAutomation["status"] = self._computeAutomationStatus(updatedAutomation)
# Ensure executionLogs is always a list, not None
if updatedAutomation.get("executionLogs") is None:
updatedAutomation["executionLogs"] = []
# Trigger sync (async, don't wait)
asyncio.create_task(self.syncAutomationEvents())
return updatedAutomation
except Exception as e:
logger.error(f"Error updating automation definition: {str(e)}")
raise
def deleteAutomationDefinition(self, automationId: str) -> bool:
"""Deletes an automation definition, then triggers sync."""
try:
# Check access
existing = self.getAutomationDefinition(automationId)
if not existing:
raise PermissionError(f"No access to automation {automationId}")
if not self._canModify(AutomationDefinition, automationId):
raise PermissionError(f"No permission to delete automation {automationId}")
# Remove event if exists
if existing.get("eventId"):
from modules.shared.eventManagement import eventManager
try:
eventManager.remove(existing["eventId"])
except Exception as e:
logger.warning(f"Error removing event {existing['eventId']}: {str(e)}")
# Delete automation from database
self.db.recordDelete(AutomationDefinition, automationId)
# Trigger sync (async, don't wait)
asyncio.create_task(self.syncAutomationEvents())
return True
except Exception as e:
logger.error(f"Error deleting automation definition: {str(e)}")
raise
def _replacePlaceholders(self, template: str, placeholders: Dict[str, str]) -> str:
"""Replace placeholders in template with actual values. Placeholder format: {{KEY:PLACEHOLDER_NAME}}"""
result = template
for placeholderName, value in placeholders.items():
pattern = f"{{{{KEY:{placeholderName}}}}}"
result = result.replace(pattern, str(value))
return result
def _parseScheduleToCron(self, schedule: str) -> Dict[str, Any]:
"""Parse schedule string to cron kwargs for APScheduler"""
parts = schedule.split()
if len(parts) != 5:
raise ValueError(f"Invalid schedule format: {schedule}")
return {
"minute": parts[0],
"hour": parts[1],
"day": parts[2],
"month": parts[3],
"day_of_week": parts[4]
}
async def executeAutomation(self, automationId: str) -> ChatWorkflow:
"""Execute automation workflow immediately (test mode) with placeholder replacement"""
executionStartTime = getUtcTimestamp()
executionLog = {
"timestamp": executionStartTime,
"workflowId": None,
"status": "running",
"messages": []
}
try:
# 1. Load automation definition
automation = self.getAutomationDefinition(automationId)
if not automation:
raise ValueError(f"Automation {automationId} not found")
executionLog["messages"].append(f"Started execution at {executionStartTime}")
# 2. Replace placeholders in template to generate plan
template = automation.get("template", "")
placeholders = automation.get("placeholders", {})
planJson = self._replacePlaceholders(template, placeholders)
plan = json.loads(planJson)
executionLog["messages"].append("Template placeholders replaced successfully")
# 3. Get user who created automation
creator_user_id = automation.get("_createdBy")
# If _createdBy is missing, try to fix it by setting it to current user
# This handles automations created before _createdBy was required
if not creator_user_id:
logger.warning(f"Automation {automationId} has no creator user, setting to current user {self.userId}")
try:
# Update the automation to set _createdBy
self.db.recordModify(
AutomationDefinition,
automationId,
{"_createdBy": self.userId}
)
creator_user_id = self.userId
automation["_createdBy"] = self.userId
logger.info(f"Fixed automation {automationId} by setting _createdBy to {self.userId}")
executionLog["messages"].append(f"Fixed missing _createdBy field, set to user {self.userId}")
except Exception as e:
logger.error(f"Error fixing automation {automationId}: {str(e)}")
raise ValueError(f"Automation {automationId} has no creator user and could not be fixed")
# Get user from database
from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface
appInterface = getAppInterface(self.currentUser)
creator_user = appInterface.getUser(creator_user_id)
if not creator_user:
raise ValueError(f"Creator user {creator_user_id} not found")
executionLog["messages"].append(f"Using creator user: {creator_user_id}")
# 4. Create UserInputRequest from plan
# Embed plan JSON in prompt for TemplateMode to extract
promptText = self._planToPrompt(plan)
planJson = json.dumps(plan)
# Embed plan as JSON comment so TemplateMode can extract it
promptWithPlan = f"{promptText}\n\n<!--TEMPLATE_PLAN_START-->\n{planJson}\n<!--TEMPLATE_PLAN_END-->"
userInput = UserInputRequest(
prompt=promptWithPlan,
listFileId=[],
userLanguage=creator_user.language or "en"
)
executionLog["messages"].append("Starting workflow execution")
# 5. Start workflow using chatStart
from modules.features.chatPlayground.mainChatPlayground import chatStart
workflow = await chatStart(
currentUser=creator_user,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_AUTOMATION,
workflowId=None
)
executionLog["workflowId"] = workflow.id
executionLog["status"] = "completed"
executionLog["messages"].append(f"Workflow {workflow.id} started successfully")
# Also store plan in module-level cache as backup (keyed by workflow ID)
from modules.workflows.processing.modes import modeAutomation
if not hasattr(modeAutomation, '_templatePlanCache'):
modeAutomation._templatePlanCache = {}
modeAutomation._templatePlanCache[workflow.id] = plan
logger.info(f"Stored template plan for workflow {workflow.id} (cache + prompt) with {len(plan.get('tasks', []))} tasks")
# Update automation with execution log
executionLogs = automation.get("executionLogs", [])
executionLogs.append(executionLog)
# Keep only last 50 executions
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
self.db.recordModify(
AutomationDefinition,
automationId,
{"executionLogs": executionLogs}
)
return workflow
except Exception as e:
# Log error to execution log
executionLog["status"] = "error"
executionLog["messages"].append(f"Error: {str(e)}")
# Update automation with execution log even on error
try:
automation = self.getAutomationDefinition(automationId)
if automation:
executionLogs = automation.get("executionLogs", [])
executionLogs.append(executionLog)
if len(executionLogs) > 50:
executionLogs = executionLogs[-50:]
self.db.recordModify(
AutomationDefinition,
automationId,
{"executionLogs": executionLogs}
)
except Exception as logError:
logger.error(f"Error saving execution log: {str(logError)}")
raise
def _planToPrompt(self, plan: Dict) -> str:
"""Convert plan structure to prompt string for workflow execution"""
return plan.get("userMessage", plan.get("overview", "Execute automation workflow"))
async def syncAutomationEvents(self) -> Dict[str, Any]:
"""Automation event handler - syncs scheduler with all active automations."""
from modules.shared.eventManagement import eventManager
# Get all automation definitions (for current mandate)
allAutomations = self.db.getRecordset(AutomationDefinition)
filtered = self._uam(AutomationDefinition, allAutomations)
registered_events = {}
for automation in filtered:
automation_id = automation.get("id")
is_active = automation.get("active", False)
current_event_id = automation.get("eventId")
schedule = automation.get("schedule")
if not schedule:
logger.warning(f"Automation {automation_id} has no schedule, skipping")
continue
try:
# Parse schedule to cron kwargs
cron_kwargs = self._parseScheduleToCron(schedule)
if is_active:
# Remove existing event if present (handles schedule changes)
if current_event_id:
try:
eventManager.remove(current_event_id)
except Exception as e:
logger.warning(f"Error removing old event {current_event_id}: {str(e)}")
# Register new event
new_event_id = f"automation.{automation_id}"
# Create event handler function
handler = self._createAutomationEventHandler(automation_id)
# Register cron job
eventManager.registerCron(
jobId=new_event_id,
func=handler,
cronKwargs=cron_kwargs,
replaceExisting=True
)
# Update automation with new eventId
if current_event_id != new_event_id:
self.db.recordModify(
AutomationDefinition,
automation_id,
{"eventId": new_event_id}
)
registered_events[automation_id] = new_event_id
else:
# Remove event if exists
if current_event_id:
try:
eventManager.remove(current_event_id)
self.db.recordModify(
AutomationDefinition,
automation_id,
{"eventId": None}
)
except Exception as e:
logger.warning(f"Error removing event {current_event_id}: {str(e)}")
except Exception as e:
logger.error(f"Error syncing automation {automation_id}: {str(e)}")
return {
"synced": len(registered_events),
"events": registered_events
}
def _createAutomationEventHandler(self, automationId: str):
"""Create event handler function for a specific automation"""
async def handler():
try:
# Get event user to access automation (event user can access all automations)
from modules.interfaces.interfaceDbAppObjects import getRootInterface
from modules.interfaces.interfaceDbAppObjects import getInterface as getAppInterface
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
rootInterface = getRootInterface()
eventUser = rootInterface.getUserByUsername("event")
if not eventUser:
logger.error("Could not get event user for automation execution")
return
# Create ChatObjects interface for event user (to access automation)
eventInterface = getChatInterface(eventUser)
# Load automation using event user context
automation = eventInterface.getAutomationDefinition(automationId)
if not automation or not automation.get("active"):
logger.warning(f"Automation {automationId} not found or not active, skipping execution")
return
# Get creator user
creator_user_id = automation.get("_createdBy")
if not creator_user_id:
logger.error(f"Automation {automationId} has no creator user")
return
# Get creator user from database
appInterface = getAppInterface(eventUser)
creator_user = appInterface.getUser(creator_user_id)
if not creator_user:
logger.error(f"Creator user {creator_user_id} not found for automation {automationId}")
return
# Create ChatObjects interface for creator user
creatorInterface = getChatInterface(creator_user)
# Execute automation with creator user's context
await creatorInterface.executeAutomation(automationId)
logger.info(f"Successfully executed automation {automationId} as user {creator_user_id}")
except Exception as e:
logger.error(f"Error executing automation {automationId}: {str(e)}")
return handler
def getInterface(currentUser: Optional[User] = None) -> 'ChatObjects':
"""
Returns a ChatObjects instance for the current user.
Handles initialization of database and records.
"""
if not currentUser:
raise ValueError("Invalid user context: user is required")
# Create context key
contextKey = f"{currentUser.mandateId}_{currentUser.id}"
# Create new instance if not exists
if contextKey not in _chatInterfaces:
_chatInterfaces[contextKey] = ChatObjects(currentUser)
return _chatInterfaces[contextKey]