workflow end2end with outlook

This commit is contained in:
ValueOn AG 2025-08-17 20:41:37 +02:00
parent 2d269137a5
commit 7d21146b6a
12 changed files with 1007 additions and 320 deletions

View file

@ -41,8 +41,21 @@ class DocumentGenerator:
if mime_type == "application/octet-stream":
content = getattr(doc, 'content', '')
mime_type = detectMimeTypeFromContent(content, doc.filename, self.service)
# Add result label to filename for document objects too
base_filename = doc.filename
if hasattr(action, 'execResultLabel') and action.execResultLabel:
result_label = action.execResultLabel.strip()
if result_label:
# Check if filename already starts with resultLabel to avoid duplication
if not base_filename.startswith(f"{result_label}-"):
base_filename = f"{result_label}-{base_filename}"
logger.info(f"Added resultLabel '{result_label}' as prefix to document object filename: {base_filename}")
else:
logger.info(f"Document object filename already has resultLabel prefix: {base_filename}")
return {
'filename': doc.filename,
'filename': base_filename,
'fileSize': getattr(doc, 'fileSize', 0),
'mimeType': mime_type,
'content': getattr(doc, 'content', ''),
@ -50,8 +63,33 @@ class DocumentGenerator:
}
elif isinstance(doc, dict):
# Dictionary format document - handle both 'documentName' and 'filename' keys
filename = doc.get('documentName', doc.get('filename', \
f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"))
base_filename = doc.get('documentName', doc.get('filename', ''))
# Debug logging for resultLabel
if hasattr(action, 'execResultLabel'):
logger.info(f"Action {action.execMethod}.{action.execAction} has execResultLabel: '{action.execResultLabel}' (type: {type(action.execResultLabel)})")
else:
logger.info(f"Action {action.execMethod}.{action.execAction} has NO execResultLabel attribute")
# If no filename provided, generate one with action info
if not base_filename:
base_filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
# ALWAYS add result label to filename for better document selection
# This ensures consistent naming regardless of whether filename was provided or generated
if hasattr(action, 'execResultLabel') and action.execResultLabel:
result_label = action.execResultLabel.strip()
if result_label:
# Check if filename already starts with resultLabel to avoid duplication
if not base_filename.startswith(f"{result_label}-"):
base_filename = f"{result_label}-{base_filename}"
logger.info(f"Added resultLabel '{result_label}' as prefix to filename: {base_filename}")
else:
logger.info(f"Filename already has resultLabel prefix: {base_filename}")
else:
logger.info(f"No resultLabel available for action {action.execMethod}.{action.execAction}")
filename = base_filename
mimeType = doc.get('mimeType', 'application/octet-stream')
# Handle documentData structure - it might be a dict with 'content' key or direct content
@ -85,7 +123,23 @@ class DocumentGenerator:
else:
# Unknown document type
logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}")
filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
base_filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
# ALWAYS add result label to filename for better document selection
# This ensures consistent naming regardless of document type
if hasattr(action, 'execResultLabel') and action.execResultLabel:
result_label = action.execResultLabel.strip()
if result_label:
# Check if filename already starts with resultLabel to avoid duplication
if not base_filename.startswith(f"{result_label}-"):
base_filename = f"{result_label}-{base_filename}"
logger.info(f"Added resultLabel '{result_label}' as prefix to fallback filename: {base_filename}")
else:
logger.info(f"Fallback filename already has resultLabel prefix: {base_filename}")
else:
logger.info(f"No resultLabel available for action {action.execMethod}.{action.execAction}")
filename = base_filename
mimeType = detectMimeTypeFromContent(doc, filename, self.service)
return {
'filename': filename,

View file

@ -368,6 +368,45 @@ class HandlingTasks:
continue
else:
logger.error(f"=== TASK {task_index or '?'} FAILED: {task_step.objective} after {attempt+1} attempts ===")
# Create user-facing error message for task failure
error_message = f"❌ Task {task_index or '?'} - '{task_step.objective}' failed after {attempt+1} attempts\n\n"
error_message += f"Objective: {task_step.objective}\n\n"
# Add specific error details if available
if error:
error_message += f"Error: {error}\n\n"
# Add retry information
error_message += f"Attempts: {attempt+1}\n"
error_message += f"Status: Will retry automatically\n\n"
error_message += "The system will attempt to retry this task. Please wait..."
# Create workflow message for user
message_data = {
"workflowId": workflow.id,
"role": "assistant",
"message": error_message,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": datetime.now(UTC).isoformat(),
"actionId": None,
"actionMethod": "task",
"actionName": "task_retry",
"documentsLabel": None,
"documents": []
}
try:
message = self.chatInterface.createWorkflowMessage(message_data)
if message:
workflow.messages.append(message)
logger.info(f"Created user-facing retry message for failed task: {task_step.objective}")
else:
logger.error(f"Failed to create user-facing retry message for failed task: {task_step.objective}")
except Exception as e:
logger.error(f"Error creating user-facing retry message: {str(e)}")
return TaskResult(
taskId=task_step.id,
status=TaskStatus.FAILED,
@ -376,6 +415,45 @@ class HandlingTasks:
error=error
)
logger.error(f"=== TASK {task_index or '?'} FAILED AFTER ALL RETRIES: {task_step.objective} ===")
# Create user-facing error message for task failure
error_message = f"❌ Task {task_index or '?'} - '{task_step.objective}' failed after all retries\n\n"
error_message += f"Objective: {task_step.objective}\n\n"
# Add specific error details if available
if error and error != "Task failed after all retries.":
error_message += f"Error: {error}\n\n"
# Add retry information
error_message += f"Retries attempted: {retry_context.retry_count if retry_context else 'Unknown'}\n"
error_message += f"Status: Task failed permanently\n\n"
error_message += "Please check the connection and try again, or contact support if the issue persists."
# Create workflow message for user
message_data = {
"workflowId": workflow.id,
"role": "assistant",
"message": error_message,
"status": "step",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": datetime.now(UTC).isoformat(),
"actionId": None,
"actionMethod": "task",
"actionName": "task_failure",
"documentsLabel": None,
"documents": []
}
try:
message = self.chatInterface.createWorkflowMessage(message_data)
if message:
workflow.messages.append(message)
logger.info(f"Created user-facing error message for failed task: {task_step.objective}")
else:
logger.error(f"Failed to create user-facing error message for failed task: {task_step.objective}")
except Exception as e:
logger.error(f"Error creating user-facing error message: {str(e)}")
return TaskResult(
taskId=task_step.id,
status=TaskStatus.FAILED,

View file

@ -383,6 +383,22 @@ class DatabaseConnector:
self._tablesCache = {}
self._tableMetadataCache = {}
def clearTableCache(self, table: str) -> None:
"""Clears cache for a specific table to ensure fresh data."""
if table in self._tablesCache:
del self._tablesCache[table]
logger.debug(f"Cleared cache for table: {table}")
if table in self._tableMetadataCache:
del self._tableMetadataCache[table]
logger.debug(f"Cleared metadata cache for table: {table}")
def clearAllCache(self) -> None:
"""Clears all cache to ensure completely fresh data."""
self._tablesCache.clear()
self._tableMetadataCache.clear()
logger.debug("Cleared all database cache")
# Public API
def getTables(self) -> List[str]:

View file

@ -235,6 +235,9 @@ class AppAccess:
"lastActivity": datetime.now()
})
# Clear cache to ensure fresh data
self.db.clearTableCache("sessions")
return True
except Exception as e:

View file

@ -194,7 +194,11 @@ class AppObjects:
Boolean indicating permission
"""
return self.access.canModify(table, recordId)
def _clearTableCache(self, table: str) -> None:
"""Clears the cache for a specific table to ensure fresh data."""
self.db.clearTableCache(table)
def getInitialId(self, table: str) -> Optional[str]:
"""Returns the initial ID for a table."""
return self.db.getInitialId(table)
@ -352,6 +356,9 @@ class AppObjects:
# Save to connections table
self.db.recordCreate("connections", connection.to_dict())
# Clear cache to ensure fresh data
self._clearTableCache("connections")
return connection
except Exception as e:
@ -372,6 +379,9 @@ class AppObjects:
# Delete connection
self.db.recordDelete("connections", connectionId)
# Clear cache to ensure fresh data
self._clearTableCache("connections")
except Exception as e:
logger.error(f"Error removing user connection: {str(e)}")
raise ValueError(f"Failed to remove user connection: {str(e)}")
@ -379,8 +389,7 @@ class AppObjects:
def authenticateLocalUser(self, username: str, password: str) -> Optional[User]:
"""Authenticates a user by username and password using local authentication."""
# Clear the users table from cache and reload it
if "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
self._clearTableCache("users")
# Get user by username
user = self.getUserByUsername(username)
@ -445,6 +454,9 @@ class AppObjects:
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create user record")
# Clear cache to ensure fresh data
self._clearTableCache("users")
# Add external connection if provided
if externalId and externalUsername:
self.addUserConnection(
@ -460,11 +472,8 @@ class AppObjects:
if not createdUser or len(createdUser) == 0:
raise ValueError("Failed to retrieve created user")
# Clear both table and metadata caches
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
if hasattr(self.db, '_tableMetadataCache') and "users" in self.db._tableMetadataCache:
del self.db._tableMetadataCache["users"]
# Clear cache to ensure fresh data (already done above)
# No need for additional cache clearing since _clearTableCache("users") was called
return User.from_dict(createdUser[0])
@ -491,6 +500,9 @@ class AppObjects:
# Update user record
self.db.recordModify("users", userId, updatedUser.to_dict())
# Clear cache to ensure fresh data
self._clearTableCache("users")
# Get updated user
updatedUser = self.getUser(userId)
if not updatedUser:
@ -562,11 +574,8 @@ class AppObjects:
if not success:
raise ValueError(f"Failed to delete user {userId}")
# Clear both table and metadata caches
if hasattr(self.db, '_tablesCache') and "users" in self.db._tablesCache:
del self.db._tablesCache["users"]
if hasattr(self.db, '_tableMetadataCache') and "users" in self.db._tableMetadataCache:
del self.db._tableMetadataCache["users"]
# Clear cache to ensure fresh data
self._clearTableCache("users")
logger.info(f"User {userId} successfully deleted")
return True
@ -611,6 +620,9 @@ class AppObjects:
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create mandate record")
# Clear cache to ensure fresh data
self._clearTableCache("mandates")
return Mandate.from_dict(createdRecord)
def updateMandate(self, mandateId: str, updateData: Dict[str, Any]) -> Mandate:
@ -637,6 +649,9 @@ class AppObjects:
# Update mandate record
self.db.recordModify("mandates", mandateId, updatedMandate.to_dict())
# Clear cache to ensure fresh data
self._clearTableCache("mandates")
# Get updated mandate
updatedMandate = self.getMandate(mandateId)
if not updatedMandate:
@ -665,7 +680,12 @@ class AppObjects:
raise ValueError(f"Cannot delete mandate {mandateId} with existing users")
# Delete mandate
return self.db.recordDelete("mandates", mandateId)
success = self.db.recordDelete("mandates", mandateId)
# Clear cache to ensure fresh data
self._clearTableCache("mandates")
return success
except Exception as e:
logger.error(f"Error deleting mandate: {str(e)}")
@ -747,6 +767,9 @@ class AppObjects:
# Save to database
self.db.recordCreate("tokens", token_dict)
# Clear cache to ensure fresh data
self._clearTableCache("tokens")
logger.debug(f"Token saved for user {self.currentUser.id} with authority {token.authority}")
except Exception as e:
@ -792,7 +815,10 @@ class AppObjects:
# Delete each token
for token in tokens:
self.db.recordDelete("tokens", token["id"])
# Clear cache to ensure fresh data
self._clearTableCache("tokens")
except Exception as e:
logger.error(f"Error deleting token: {str(e)}")
raise

View file

@ -121,6 +121,10 @@ class ChatObjects:
"""Delegate to access control module."""
return self.access.canModify(table, recordId)
def _clearTableCache(self, table: str) -> None:
"""Clears the cache for a specific table to ensure fresh data."""
self.db.clearTableCache(table)
# Utilities
def getInitialId(self, table: str) -> Optional[str]:
@ -196,6 +200,9 @@ class ChatObjects:
# Create workflow in database
created = self.db.recordCreate("workflows", workflowData)
# Clear cache to ensure fresh data
self._clearTableCache("workflows")
# Convert to ChatWorkflow model
return ChatWorkflow(
id=created["id"],
@ -226,6 +233,9 @@ class ChatObjects:
# Update workflow in database
updated = self.db.recordModify("workflows", workflowId, workflowData)
# Clear cache to ensure fresh data
self._clearTableCache("workflows")
# Convert to ChatWorkflow model
return ChatWorkflow(
id=updated["id"],
@ -256,7 +266,12 @@ class ChatObjects:
raise PermissionError(f"No permission to delete workflow {workflowId}")
# Delete workflow
return self.db.recordDelete("workflows", workflowId)
success = self.db.recordDelete("workflows", workflowId)
# Clear cache to ensure fresh data
self._clearTableCache("workflows")
return success
# Workflow Messages
@ -328,6 +343,9 @@ class ChatObjects:
# Create message in database
createdMessage = self.db.recordCreate("workflowMessages", messageData)
# Clear cache to ensure fresh data
self._clearTableCache("workflowMessages")
# Convert to ChatMessage model
return ChatMessage(
id=createdMessage["id"],
@ -411,6 +429,9 @@ class ChatObjects:
updatedMessage = self.db.recordModify("workflowMessages", messageId, messageData)
if updatedMessage:
logger.debug(f"Message {messageId} updated successfully")
# Clear cache to ensure fresh data
self._clearTableCache("workflowMessages")
else:
logger.warning(f"Failed to update message {messageId}")
@ -440,7 +461,12 @@ class ChatObjects:
return False
# Delete the message from the database
return self.db.recordDelete("workflowMessages", messageId)
success = self.db.recordDelete("workflowMessages", messageId)
# Clear cache to ensure fresh data
self._clearTableCache("workflowMessages")
return success
except Exception as e:
logger.error(f"Error deleting message {messageId}: {str(e)}")
return False
@ -709,6 +735,9 @@ class ChatObjects:
# Create log in database
createdLog = self.db.recordCreate("workflowLogs", log_model.to_dict())
# Clear cache to ensure fresh data
self._clearTableCache("workflowLogs")
# Return validated ChatLog instance
return ChatLog(**createdLog)
@ -1078,6 +1107,9 @@ class ChatObjects:
# Create task in database
createdTask = self.db.recordCreate("tasks", taskData)
# Clear cache to ensure fresh data
self._clearTableCache("tasks")
# Convert to TaskItem model
task = TaskItem(
id=createdTask["id"],
@ -1130,6 +1162,9 @@ class ChatObjects:
# Update task in database
updatedTask = self.db.recordModify("tasks", taskId, taskData)
# Clear cache to ensure fresh data
self._clearTableCache("tasks")
# Convert to TaskItem model
return TaskItem(
id=updatedTask["id"],
@ -1178,6 +1213,9 @@ class ChatObjects:
if taskId in workflowTasks:
workflowTasks.remove(taskId)
self.updateWorkflow(task.workflowId, {"tasks": workflowTasks})
# Clear cache to ensure fresh data
self._clearTableCache("tasks")
return True
return False

View file

@ -235,6 +235,10 @@ class ComponentObjects:
"""Delegate to access control module."""
return self.access.canModify(table, recordId)
def _clearTableCache(self, table: str) -> None:
"""Clears the cache for a specific table to ensure fresh data."""
self.db.clearTableCache(table)
# Utilities
def getInitialId(self, table: str) -> Optional[str]:
@ -279,6 +283,9 @@ class ComponentObjects:
if not createdRecord or not createdRecord.get("id"):
raise ValueError("Failed to create prompt record")
# Clear cache to ensure fresh data
self._clearTableCache("prompts")
return createdRecord
def updatePrompt(self, promptId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
@ -292,6 +299,9 @@ class ComponentObjects:
# Update prompt record directly with the update data
self.db.recordModify("prompts", promptId, updateData)
# Clear cache to ensure fresh data
self._clearTableCache("prompts")
# Get updated prompt
updatedPrompt = self.getPrompt(promptId)
if not updatedPrompt:
@ -313,7 +323,13 @@ class ComponentObjects:
if not self._canModify("prompts", promptId):
raise PermissionError(f"No permission to delete prompt {promptId}")
return self.db.recordDelete("prompts", promptId)
# Delete prompt
success = self.db.recordDelete("prompts", promptId)
# Clear cache to ensure fresh data
self._clearTableCache("prompts")
return success
# File Utilities
@ -528,6 +544,10 @@ class ComponentObjects:
# Store in database
self.db.recordCreate("files", fileItem.to_dict())
# Clear cache to ensure fresh data
self._clearTableCache("files")
return fileItem
def updateFile(self, fileId: str, updateData: Dict[str, Any]) -> Dict[str, Any]:
@ -545,7 +565,12 @@ class ComponentObjects:
updateData["filename"] = self._generateUniqueFilename(updateData["filename"], fileId)
# Update file
return self.db.recordModify("files", fileId, updateData)
success = self.db.recordModify("files", fileId, updateData)
# Clear cache to ensure fresh data
self._clearTableCache("files")
return success
def deleteFile(self, fileId: str) -> bool:
"""Deletes a file if user has access."""
@ -576,7 +601,12 @@ class ComponentObjects:
logger.warning(f"Error deleting FileData for file {fileId}: {str(e)}")
# Delete the FileItem entry
return self.db.recordDelete("files", fileId)
success = self.db.recordDelete("files", fileId)
# Clear cache to ensure fresh data
self._clearTableCache("files")
return success
except FileNotFoundError as e:
raise
@ -634,6 +664,10 @@ class ComponentObjects:
}
self.db.recordCreate("fileData", fileDataObj)
# Clear cache to ensure fresh data
self._clearTableCache("fileData")
logger.debug(f"Successfully stored data for file {fileId} (base64Encoded: {base64Encoded})")
return True
except Exception as e:
@ -668,8 +702,25 @@ class ComponentObjects:
# Decode base64 to bytes
return base64.b64decode(data)
else:
# Convert text to bytes
return data.encode('utf-8')
# Check if this is supposed to be a binary file based on mime type
mimeType = file.mimeType
isTextFormat = self.isTextMimeType(mimeType)
if isTextFormat:
# This is a text file, encode to bytes as expected
return data.encode('utf-8')
else:
# This is a binary file that was incorrectly stored as text
# Try to decode it as if it was base64 (common fallback scenario)
try:
logger.warning(f"Binary file {fileId} ({mimeType}) was stored as text, attempting base64 decode")
return base64.b64decode(data)
except Exception as base64_error:
logger.error(f"Failed to decode binary file {fileId} as base64: {str(base64_error)}")
# Last resort: return the data as-is (might be corrupted)
logger.warning(f"Returning raw data for file {fileId} - file may be corrupted")
return data.encode('utf-8') if isinstance(data, str) else data
except Exception as e:
logger.error(f"Error processing file data for {fileId}: {str(e)}")
return None
@ -810,7 +861,11 @@ class ComponentObjects:
self.db.recordCreate("fileData", dataUpdate)
logger.debug(f"Created new file data for file ID {fileId} (base64Encoded: {base64Encoded})")
# Clear cache to ensure fresh data
self._clearTableCache("fileData")
return True
except Exception as e:
logger.error(f"Error updating data for file {fileId}: {str(e)}")
return False

File diff suppressed because it is too large Load diff

View file

@ -34,8 +34,7 @@ async def get_connections(
interface = getInterface(currentUser)
# Clear connections cache to ensure fresh data
if "connections" in interface.db._tablesCache:
del interface.db._tablesCache["connections"]
interface.db.clearTableCache("connections")
if currentUser.privilege in ['admin', 'sysadmin']:
# Admins can see all connections
@ -107,6 +106,9 @@ async def create_connection(
# Save connection record
interface.db.recordModify("connections", connection.id, connection_dict)
# Clear cache to ensure fresh data
interface.db.clearTableCache("connections")
return connection
except HTTPException:
@ -174,9 +176,13 @@ async def update_connection(
elif isinstance(connection_dict[field], (int, float)):
connection_dict[field] = datetime.fromtimestamp(connection_dict[field]).isoformat()
# Update connection record
# Update connection
interface.db.recordModify("connections", connectionId, connection_dict)
# Clear cache to ensure fresh data
interface.db.clearTableCache("connections")
# Get updated connection
return connection
except HTTPException:
@ -306,6 +312,9 @@ async def disconnect_service(
# Update connection record
interface.db.recordModify("connections", connectionId, connection.to_dict())
# Clear cache to ensure fresh data
interface.db.clearTableCache("connections")
return {"message": "Service disconnected successfully"}
except HTTPException:

View file

@ -332,6 +332,9 @@ async def auth_callback(code: str, state: str, request: Request) -> HTMLResponse
# Update connection record directly
rootInterface.db.recordModify("connections", connection_id, connection.to_dict())
# Clear cache to ensure fresh data
rootInterface.db.clearTableCache("connections")
# Save token
token = Token(
userId=user.id, # Use local user's ID

View file

@ -314,6 +314,9 @@ async def auth_callback(code: str, state: str, request: Request) -> HTMLResponse
# Update connection record directly
rootInterface.db.recordModify("connections", connection_id, connection.to_dict())
# Clear cache to ensure fresh data
rootInterface.db.clearTableCache("connections")
# Save token
token = Token(
userId=user.id, # Use local user's ID

View file

@ -163,6 +163,9 @@ def createUserSession(userId: str, tokenId: str, request: Request) -> Session:
# Save session to database
appInterface.db.recordCreate("sessions", session.to_dict())
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("sessions")
# Log auth event
event = AuthEvent(
userId=userId,
@ -173,6 +176,9 @@ def createUserSession(userId: str, tokenId: str, request: Request) -> Session:
)
appInterface.db.recordCreate("auth_events", event.to_dict())
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("auth_events")
return session
def logAuthEvent(userId: str, eventType: str, details: Dict[str, Any], request: Request) -> None:
@ -189,6 +195,9 @@ def logAuthEvent(userId: str, eventType: str, details: Dict[str, Any], request:
# Save event to database
appInterface.db.recordCreate("auth_events", event.to_dict())
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("auth_events")
def validateSession(sessionId: str) -> bool:
"""Validate a user session."""
@ -207,6 +216,9 @@ def validateSession(sessionId: str) -> bool:
"lastActivity": datetime.now(timezone.utc)
})
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("sessions")
return True
def revokeSession(sessionId: str) -> None:
@ -215,6 +227,9 @@ def revokeSession(sessionId: str) -> None:
# Delete session
appInterface.db.recordDelete("sessions", sessionId)
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("sessions")
def revokeAllUserSessions(userId: str) -> None:
"""Revoke all sessions for a user."""
@ -226,3 +241,6 @@ def revokeAllUserSessions(userId: str) -> None:
# Delete each session
for session in sessions:
appInterface.db.recordDelete("sessions", session["id"])
# Clear cache to ensure fresh data
appInterface.db.clearTableCache("sessions")