gateway/modules/services/serviceWorkflow/mainServiceWorkflow.py
2025-10-14 14:47:50 +02:00

835 lines
No EOL
42 KiB
Python

import logging
import uuid
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User, UserConnection
from modules.datamodels.datamodelChat import ChatDocument, ChatMessage
from modules.datamodels.datamodelChat import ChatContentExtracted
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
from modules.shared.timezoneUtils import get_utc_timestamp
from modules.services.serviceAi.mainServiceAi import AiService
from modules.security.tokenManager import TokenManager
logger = logging.getLogger(__name__)
class WorkflowService:
"""Service class containing methods for document processing, chat operations, and workflow management"""
def __init__(self, serviceCenter):
self.services = serviceCenter
self.user = serviceCenter.user
self.workflow = serviceCenter.workflow
self.interfaceDbChat = serviceCenter.interfaceDbChat
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
self.interfaceDbApp = serviceCenter.interfaceDbApp
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
"""
Summarize chat messages from last to first message with status="first"
Args:
messages: List of chat messages to summarize
Returns:
str: Summary of the chat in user's language
"""
try:
# Get messages from last to first, stopping at first message with status="first"
relevantMessages = []
for msg in reversed(messages):
relevantMessages.append(msg)
if msg.status == "first":
break
# Create prompt for AI
prompt = f"""You are an AI assistant providing a summary of a chat conversation.
Please respond in '{self.user.language}' language.
Chat History:
{chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))}
Instructions:
1. Summarize the conversation's key points and outcomes
2. Be concise but informative
3. Use a professional but friendly tone
4. Focus on important decisions and next steps if any
Please provide a comprehensive summary of this conversation."""
# Get summary using AI service directly (avoiding circular dependency)
ai_service = AiService(self)
return await ai_service.callAi(
prompt=prompt,
documents=None,
options={
"process_type": "text",
"operation_type": "generate_content",
"priority": "speed",
"compress_prompt": True,
"compress_documents": False,
"max_cost": 0.01
}
)
except Exception as e:
logger.error(f"Error summarizing chat: {str(e)}")
return f"Error summarizing chat: {str(e)}"
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
# Get the current workflow from services (same pattern as setWorkflowContext)
workflow = getattr(self.services, 'currentWorkflow', None) or self.workflow
if not workflow:
logger.error("No workflow available for document list resolution")
return []
all_documents = []
for doc_ref in documentList:
if doc_ref.startswith("docItem:"):
# docItem:<id>:<filename> - extract ID and find document
parts = doc_ref.split(':')
if len(parts) >= 2:
doc_id = parts[1]
# Find the document by ID
for message in workflow.messages:
if message.documents:
for doc in message.documents:
if doc.id == doc_id:
doc_name = getattr(doc, 'fileName', 'unknown')
logger.debug(f"Found docItem reference {doc_ref}: {doc_name}")
all_documents.append(doc)
break
elif doc_ref.startswith("docList:"):
# docList:<messageId>:<label> or docList:<label> - extract message ID and find document list
parts = doc_ref.split(':')
if len(parts) >= 3:
# Format: docList:<messageId>:<label>
message_id = parts[1]
label = parts[2]
logger.debug(f"Looking for message with ID: {message_id} and label: {label}")
# Find the message by ID and get all its documents
message_found = False
for message in workflow.messages:
logger.debug(f"Checking message ID: {message.id} (looking for: {message_id})")
if str(message.id) == message_id:
message_found = True
logger.debug(f"Found message {message.id} with documentsLabel: {getattr(message, 'documentsLabel', 'None')}")
if message.documents:
doc_names = [doc.fileName for doc in message.documents if hasattr(doc, 'fileName')]
logger.debug(f"Found docList reference {doc_ref}: {len(message.documents)} documents - {doc_names}")
all_documents.extend(message.documents)
else:
logger.debug(f"Found docList reference {doc_ref} but message has no documents")
break
if not message_found:
available_ids = [str(msg.id) for msg in workflow.messages]
logger.error(f"Message with ID {message_id} not found in workflow. Available message IDs: {available_ids}")
raise ValueError(f"Document reference not found: docList:{message_id}:{label}")
elif len(parts) >= 2:
# Format: docList:<label> - find message by documentsLabel
label = parts[1]
logger.debug(f"Looking for message with documentsLabel: {label}")
# Find messages with matching documentsLabel
matching_messages = []
for message in workflow.messages:
# Check both attribute and raw data for documentsLabel
msg_label = getattr(message, 'documentsLabel', None)
if msg_label == label:
matching_messages.append(message)
logger.debug(f"Found message {message.id} with matching documentsLabel: {msg_label}")
else:
# Debug: show what labels we're comparing
logger.debug(f"Message {message.id} has documentsLabel: '{msg_label}' (looking for: '{label}')")
if matching_messages:
# Use the newest message (highest publishedAt)
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
newest_message = matching_messages[0]
if newest_message.documents:
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
logger.debug(f"Found docList reference {doc_ref}: {len(newest_message.documents)} documents - {doc_names}")
all_documents.extend(newest_message.documents)
else:
logger.debug(f"Found docList reference {doc_ref} but message has no documents")
else:
logger.error(f"No messages found with documentsLabel: {label}")
raise ValueError(f"Document reference not found: docList:{label}")
else:
# Direct label reference (round1_task2_action3_contextinfo)
# Search for messages with matching documentsLabel to find the actual documents
if doc_ref.startswith("round"):
# Parse round/task/action to find the corresponding document list
label_parts = doc_ref.split('_', 3)
if len(label_parts) >= 4:
round_num = int(label_parts[0].replace('round', ''))
task_num = int(label_parts[1].replace('task', ''))
action_num = int(label_parts[2].replace('action', ''))
context_info = label_parts[3]
logger.debug(f"Resolving round reference: round{round_num}_task{task_num}_action{action_num}_{context_info}")
logger.debug(f"Looking for messages with documentsLabel matching: {doc_ref}")
# Find messages with matching documentsLabel (this is the correct way!)
# In case of retries, we want the NEWEST message (most recent publishedAt)
matching_messages = []
for message in workflow.messages:
msg_documents_label = getattr(message, 'documentsLabel', '')
# Check if this message's documentsLabel matches our reference
if msg_documents_label == doc_ref:
# Found a matching message, collect it for comparison
matching_messages.append(message)
logger.debug(f"Found message {message.id} with matching documentsLabel: {msg_documents_label}")
# If we found matching messages, take the newest one (highest publishedAt)
if matching_messages:
# Sort by publishedAt descending (newest first)
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
newest_message = matching_messages[0]
logger.debug(f"Found {len(matching_messages)} matching messages, using newest: {newest_message.id} (publishedAt: {getattr(newest_message, 'publishedAt', 'unknown')})")
logger.debug(f"Newest message has {len(newest_message.documents) if newest_message.documents else 0} documents")
if newest_message.documents:
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
logger.debug(f"Added {len(newest_message.documents)} documents from newest message {newest_message.id}: {doc_names}")
all_documents.extend(newest_message.documents)
else:
logger.debug(f"No documents found in newest message {newest_message.id}")
else:
logger.error(f"No messages found with documentsLabel: {doc_ref}")
raise ValueError(f"Document reference not found: {doc_ref}")
logger.debug(f"Resolved {len(all_documents)} documents from document list: {documentList}")
return all_documents
except Exception as e:
logger.error(f"Error getting documents from document list: {str(e)}")
return []
def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str:
"""Get connection reference from UserConnection with enhanced state information"""
# Get token information to check if it's expired
token = None
token_status = "unknown"
try:
# Get a fresh token via TokenManager convenience method
logger.debug(f"Getting fresh token for connection {connection.id}")
token = TokenManager().getFreshToken(connection.id)
if token:
if hasattr(token, 'expiresAt') and token.expiresAt:
current_time = get_utc_timestamp()
logger.debug(f"getConnectionReferenceFromUserConnection: Current time: {current_time}")
logger.debug(f"getConnectionReferenceFromUserConnection: Token expires at: {token.expiresAt}")
if current_time > token.expiresAt:
token_status = "expired"
else:
# Check if this token was recently refreshed (within last 5 minutes)
time_since_creation = current_time - token.createdAt if hasattr(token, 'createdAt') else 0
if time_since_creation < 300: # 5 minutes
token_status = "valid (refreshed)"
else:
token_status = "valid"
else:
token_status = "no_expiration"
else:
token_status = "no_token"
except Exception as e:
token_status = f"error: {str(e)}"
# Build enhanced reference with state information
# Format: connection:msft:<username> (without UUID)
base_ref = f"connection:{connection.authority.value}:{connection.externalUsername}"
state_info = f" [status:{connection.status.value}, token:{token_status}]"
logger.debug(f"getConnectionReferenceFromUserConnection: Built reference: {base_ref + state_info}")
return base_ref + state_info
def getUserConnectionByExternalUsername(self, authority: str, externalUsername: str) -> Optional[UserConnection]:
"""Fetch the user's connection by authority and external username."""
try:
if not authority or not externalUsername:
return None
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
for connection in user_connections:
# Normalize authority for comparison (enum vs string)
connection_authority = connection.authority.value if hasattr(connection.authority, 'value') else str(connection.authority)
if connection_authority.lower() == authority.lower() and connection.externalUsername == externalUsername:
return connection
return None
except Exception as e:
logger.error(f"Error getting connection by external username: {str(e)}")
return None
def getUserConnectionFromConnectionReference(self, connectionReference: str) -> Optional[UserConnection]:
"""Get UserConnection from reference string (handles new format without UUID)"""
try:
# Parse reference format: connection:{authority}:{username} [status:..., token:...]
# Remove state information if present
base_reference = connectionReference.split(' [')[0]
parts = base_reference.split(':')
if len(parts) != 3 or parts[0] != "connection":
return None
authority = parts[1]
username = parts[2]
# Get user connections through AppObjects interface
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
# Find matching connection by authority and username (no UUID needed)
for conn in user_connections:
if conn.authority.value == authority and conn.externalUsername == username:
return conn
return None
except Exception as e:
logger.error(f"Error parsing connection reference: {str(e)}")
return None
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
"""Get file information"""
file_item = self.interfaceDbComponent.getFile(fileId)
if file_item:
return {
"id": file_item.id,
"fileName": file_item.fileName,
"size": file_item.fileSize,
"mimeType": file_item.mimeType,
"fileHash": file_item.fileHash,
"creationDate": file_item.creationDate
}
return None
def getFileData(self, fileId: str) -> bytes:
"""Get file data by ID"""
return self.interfaceDbComponent.getFileData(fileId)
def _diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]:
"""
Diagnose document access issues and provide recovery information.
This method helps identify why document properties are inaccessible.
"""
try:
diagnosis = {
'document_id': document.id,
'file_id': document.fileId,
'has_component_interface': document._componentInterface is not None,
'component_interface_type': type(document._componentInterface).__name__ if document._componentInterface else None,
'file_exists': False,
'file_info': None,
'error_details': None
}
# Check if component interface is set
if not document._componentInterface:
diagnosis['error_details'] = "Component interface not set - document cannot access file system"
return diagnosis
# Try to access the file directly
try:
file_info = self.interfaceDbComponent.getFile(document.fileId)
if file_info:
diagnosis['file_exists'] = True
diagnosis['file_info'] = {
'fileName': file_info.fileName if hasattr(file_info, 'fileName') else 'N/A',
'fileSize': file_info.fileSize if hasattr(file_info, 'fileSize') else 'N/A',
'mimeType': file_info.mimeType if hasattr(file_info, 'mimeType') else 'N/A'
}
else:
diagnosis['error_details'] = f"File with ID {document.fileId} not found in component interface"
except Exception as e:
diagnosis['error_details'] = f"Error accessing file {document.fileId}: {str(e)}"
return diagnosis
except Exception as e:
return {
'document_id': document.id if hasattr(document, 'id') else 'unknown',
'file_id': document.fileId if hasattr(document, 'fileId') else 'unknown',
'error_details': f"Error during diagnosis: {str(e)}"
}
def _recoverDocumentAccess(self, document: ChatDocument) -> bool:
"""
Attempt to recover document access by re-setting the component interface.
Returns True if recovery was successful.
"""
try:
logger.info(f"Attempting to recover document access for document {document.id}")
# Re-set the component interface
document.setComponentInterface(self.interfaceDbComponent)
# Test if we can now access the fileName
try:
test_fileName = document.fileName
logger.info(f"Document access recovered for {document.id} -> {test_fileName}")
return True
except Exception as e:
logger.error(f"Document access recovery failed for {document.id}: {str(e)}")
return False
except Exception as e:
logger.error(f"Error during document access recovery for {document.id}: {str(e)}")
return False
def calculateObjectSize(self, obj: Any) -> int:
"""
Calculate the size of an object in bytes.
Args:
obj: Object to calculate size for
Returns:
int: Size in bytes
"""
try:
import json
import sys
if obj is None:
return 0
# Convert object to JSON string and calculate size
json_str = json.dumps(obj, ensure_ascii=False, default=str)
return len(json_str.encode('utf-8'))
except Exception as e:
logger.error(f"Error calculating object size: {str(e)}")
return 0
def getWorkflowContext(self) -> Dict[str, int]:
"""Get current workflow context for document generation"""
try:
return {
'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0,
'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0,
'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
}
except Exception as e:
logger.error(f"Error getting workflow context: {str(e)}")
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
def setWorkflowContext(self, round_number: int = None, task_number: int = None, action_number: int = None):
"""Set current workflow context for document generation and routing"""
try:
# Get the current workflow from services
workflow = getattr(self.services, 'currentWorkflow', None) or self.workflow
if not workflow:
logger.error("No workflow available for context setting")
return
# Prepare update data
update_data = {}
if round_number is not None:
workflow.currentRound = round_number
update_data["currentRound"] = round_number
if task_number is not None:
workflow.currentTask = task_number
update_data["currentTask"] = task_number
if action_number is not None:
workflow.currentAction = action_number
update_data["currentAction"] = action_number
# Persist changes to database if any updates were made
if update_data:
self.interfaceDbChat.updateWorkflow(workflow.id, update_data)
logger.debug(f"Updated workflow context: Round {workflow.currentRound if hasattr(workflow, 'currentRound') else 'N/A'}, Task {workflow.currentTask if hasattr(workflow, 'currentTask') else 'N/A'}, Action {workflow.currentAction if hasattr(workflow, 'currentAction') else 'N/A'}")
except Exception as e:
logger.error(f"Error setting workflow context: {str(e)}")
def getWorkflowStats(self) -> Dict[str, Any]:
"""Get comprehensive workflow statistics including current context"""
try:
workflow_context = self.getWorkflowContext()
return {
'currentRound': workflow_context['currentRound'],
'currentTask': workflow_context['currentTask'],
'currentAction': workflow_context['currentAction'],
'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0,
'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0,
'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown',
'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown'
}
except Exception as e:
logger.error(f"Error getting workflow stats: {str(e)}")
return {
'currentRound': 0,
'currentTask': 0,
'currentAction': 0,
'totalTasks': 0,
'totalActions': 0,
'workflowStatus': 'unknown',
'workflowId': 'unknown'
}
def createWorkflow(self, workflowData: Dict[str, Any]):
"""Create a new workflow by delegating to the chat interface"""
try:
return self.interfaceDbChat.createWorkflow(workflowData)
except Exception as e:
logger.error(f"Error creating workflow: {str(e)}")
raise
def updateWorkflow(self, workflowId: str, updateData: Dict[str, Any]):
"""Update workflow by delegating to the chat interface"""
try:
return self.interfaceDbChat.updateWorkflow(workflowId, updateData)
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise
def updateWorkflowStats(self, workflowId: str, **kwargs):
"""Update workflow statistics by delegating to the chat interface"""
try:
return self.interfaceDbChat.updateWorkflowStats(workflowId, **kwargs)
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
raise
def getWorkflow(self, workflowId: str):
"""Get workflow by ID by delegating to the chat interface"""
try:
return self.interfaceDbChat.getWorkflow(workflowId)
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise
def createMessage(self, messageData: Dict[str, Any]):
"""Create a new message by delegating to the chat interface"""
try:
return self.interfaceDbChat.createMessage(messageData)
except Exception as e:
logger.error(f"Error creating message: {str(e)}")
raise
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
"""Update message by delegating to the chat interface"""
try:
return self.interfaceDbChat.updateMessage(messageId, messageData)
except Exception as e:
logger.error(f"Error updating message: {str(e)}")
raise
def createLog(self, logData: Dict[str, Any]):
"""Create a new log entry by delegating to the chat interface"""
try:
return self.interfaceDbChat.createLog(logData)
except Exception as e:
logger.error(f"Error creating log: {str(e)}")
raise
def getDocumentCount(self) -> str:
"""Get document count for task planning (matching old handlingTasks.py logic)"""
try:
# Get the current workflow from services
workflow = getattr(self.services, 'currentWorkflow', None) or self.workflow
if not workflow:
return "No documents available"
# Count documents from all messages in the workflow (like old system)
total_docs = 0
for message in workflow.messages:
if hasattr(message, 'documents') and message.documents:
total_docs += len(message.documents)
if total_docs == 0:
return "No documents available"
return f"{total_docs} document(s) available"
except Exception as e:
logger.error(f"Error getting document count: {str(e)}")
return "No documents available"
def getWorkflowHistoryContext(self) -> str:
"""Get workflow history context for task planning (matching old handlingTasks.py logic)"""
try:
# Get the current workflow from services
workflow = getattr(self.services, 'currentWorkflow', None) or self.workflow
if not workflow:
return "No previous round context available"
# Check if there are any previous rounds by looking for "first" messages
has_previous_rounds = False
for message in workflow.messages:
if hasattr(message, 'status') and message.status == "first":
has_previous_rounds = True
break
if not has_previous_rounds:
return "No previous round context available"
# Get document reference list to show what documents are available from previous rounds
document_list = self._getDocumentReferenceList(workflow)
# Build context string showing previous rounds
context = "Previous workflow rounds contain documents:\n"
# Show history exchanges (previous rounds)
if document_list["history"]:
for exchange in document_list["history"]:
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} ({len(exchange['documents'])} documents)\n"
else:
context = "No previous round context available"
return context
except Exception as e:
logger.error(f"Error getting workflow history context: {str(e)}")
return "No previous round context available"
def getAvailableDocuments(self, workflow) -> str:
"""Get available documents formatted for AI prompts (exact copy of old ServiceCenter.getEnhancedDocumentContext)"""
try:
if not workflow or not hasattr(workflow, 'messages'):
return "No documents available"
# Reload workflow from database to ensure we have all messages
if hasattr(workflow, 'id'):
try:
workflow = self.getWorkflow(workflow.id)
except Exception as e:
logger.warning(f"Could not reload workflow from database: {str(e)}")
# Get document reference list using the exact same logic as old system
document_list = self._getDocumentReferenceList(workflow)
# Build index string for AI action planning
context = ""
# Process current round exchanges first
if document_list["chat"]:
context += "\nCurrent round documents:\n"
for exchange in document_list["chat"]:
# Generate docList reference for the exchange (using message ID and label)
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
# Fallback to label-only format if message ID not found
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list
for doc_ref in exchange['documents']:
if doc_ref.startswith("docItem:"):
context += f" - {doc_ref}\n"
else:
# Convert to proper docItem format if needed
context += f" - docItem:{doc_ref}\n"
context += "\n"
# Process previous rounds after
if document_list["history"]:
context += "\nPast rounds documents:\n"
for exchange in document_list["history"]:
# Generate docList reference for the exchange (using message ID and label)
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
# Fallback to label-only format if message ID not found
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list
for doc_ref in exchange['documents']:
if doc_ref.startswith("docItem:"):
context += f" - {doc_ref}\n"
else:
# Convert to proper docItem format if needed
context += f" - docItem:{doc_ref}\n"
context += "\n"
if not document_list["chat"] and not document_list["history"]:
context += "\nNO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
return context
except Exception as e:
logger.error(f"Error getting available documents: {str(e)}")
return "NO DOCUMENTS AVAILABLE - Error generating document context."
def _getDocumentReferenceList(self, workflow) -> Dict[str, List]:
"""Get list of document exchanges with new labeling format, sorted by recency (exact copy of old system)"""
# Collect all documents first and refresh their attributes
all_documents = []
for message in workflow.messages:
if message.documents:
all_documents.extend(message.documents)
# Refresh file attributes for all documents
if all_documents:
self._refreshDocumentFileAttributes(all_documents)
chat_exchanges = []
history_exchanges = []
# Process messages in reverse order; "first" marks boundary
in_current_round = True
for message in reversed(workflow.messages):
is_first = message.status == "first" if hasattr(message, 'status') else False
# Build a DocumentExchange if message has documents and an explicit documentsLabel
doc_exchange = None
if message.documents:
existing_label = getattr(message, 'documentsLabel', None)
if existing_label:
# Validate and use the message's actual documentsLabel
validated_label = self._validateDocumentLabelConsistency(message)
doc_refs = []
for doc in message.documents:
doc_ref = self._getDocumentReferenceFromChatDocument(doc, message)
doc_refs.append(doc_ref)
doc_exchange = {
'documentsLabel': validated_label,
'documents': doc_refs
}
# IMPORTANT: Never synthesize new labels here. If a message lacks
# a documentsLabel, we skip adding an exchange for it.
# Append to appropriate container based on boundary
if doc_exchange:
if in_current_round:
chat_exchanges.append(doc_exchange)
else:
history_exchanges.append(doc_exchange)
# Flip boundary after including the "first" message in chat
if in_current_round and is_first:
in_current_round = False
# Sort by recency: most recent first, then current round, then earlier rounds
# Sort chat exchanges by message sequence number (most recent first)
chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
# Sort history exchanges by message sequence number (most recent first)
history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
return {
"chat": chat_exchanges,
"history": history_exchanges
}
def _refreshDocumentFileAttributes(self, documents) -> None:
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
for doc in documents:
try:
# Use the proper WorkflowService method to get file info
file_info = self.getFileInfo(doc.fileId)
if file_info:
doc.fileName = file_info.get("fileName", doc.fileName)
doc.fileSize = file_info.get("size", doc.fileSize)
doc.mimeType = file_info.get("mimeType", doc.mimeType)
else:
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
except Exception as e:
logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
def _generateWorkflowContextPrefix(self, message) -> str:
"""Generate workflow context prefix: round{num}_task{num}_action{num}"""
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
return f"round{round_num}_task{task_num}_action{action_num}"
def _getDocumentReferenceFromChatDocument(self, document, message) -> str:
"""Get document reference using document ID and filename."""
try:
# Use document ID and filename for simple reference
return f"docItem:{document.id}:{document.fileName}"
except Exception as e:
logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
# Re-raise the error to prevent workflow from continuing with invalid data
raise
def _getMessageSequenceForExchange(self, exchange, workflow) -> int:
"""Get message sequence number for sorting exchanges by recency"""
try:
# Extract message ID from the first document reference
if exchange['documents'] and len(exchange['documents']) > 0:
first_doc_ref = exchange['documents'][0]
if first_doc_ref.startswith("docItem:"):
# docItem:<id>:<label> - extract ID
parts = first_doc_ref.split(':')
if len(parts) >= 2:
doc_id = parts[1]
# Find the message containing this document
for message in workflow.messages:
if message.documents:
for doc in message.documents:
if doc.id == doc_id:
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
elif first_doc_ref.startswith("docList:"):
# docList:<message_id>:<label> - extract message ID
parts = first_doc_ref.split(':')
if len(parts) >= 2:
message_id = parts[1]
# Find the message by ID
for message in workflow.messages:
if str(message.id) == message_id:
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
return 0
except Exception as e:
logger.error(f"Error getting message sequence for exchange: {str(e)}")
return 0
def _validateDocumentLabelConsistency(self, message) -> str:
"""Validate that the document label used for references matches the message's actual label"""
if not hasattr(message, 'documentsLabel') or not message.documentsLabel:
return None
# Simply return the message's actual documentsLabel - no correction, just validation
return message.documentsLabel
def getConnectionReferenceList(self) -> List[str]:
"""Get connection reference list (matching old handlingTasks.py logic)"""
try:
# Get connections from the database using the same logic as the old system
if hasattr(self.services, 'interfaceDbApp') and hasattr(self.services, 'user'):
userId = self.services.user.id
connections = self.services.interfaceDbApp.getUserConnections(userId)
if connections:
# Format connections as reference strings using the same pattern as the old system
connectionRefs = []
for conn in connections:
# Create reference string in format: connection:{authority}:{username} [status:..., token:...]
# This matches the format expected by getUserConnectionFromConnectionReference()
ref = self.getConnectionReferenceFromUserConnection(conn)
connectionRefs.append(ref)
return connectionRefs
return []
except Exception as e:
logger.error(f"Error getting connection reference list: {str(e)}")
return []