977 lines
48 KiB
Python
977 lines
48 KiB
Python
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelUam import User, UserConnection
|
|
from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat, ChatLog
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
|
|
from modules.security.tokenManager import TokenManager
|
|
from modules.shared.progressLogger import ProgressLogger
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ChatService:
|
|
"""Service class containing methods for document processing, chat operations, and workflow management"""
|
|
|
|
def __init__(self, serviceCenter):
|
|
self.services = serviceCenter
|
|
self.user = serviceCenter.user
|
|
# self.services.workflow is now the ChatWorkflow object (stable during workflow execution)
|
|
self.interfaceDbChat = serviceCenter.interfaceDbChat
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
self.interfaceDbApp = serviceCenter.interfaceDbApp
|
|
self._progressLogger = None
|
|
|
|
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
|
|
"""Get ChatDocuments from a list of document references using all three formats."""
|
|
try:
|
|
# Use self.services.workflow which is the ChatWorkflow object (stable during workflow execution)
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
logger.error("getChatDocumentsFromDocumentList: No workflow available (self.services.workflow is not set)")
|
|
return []
|
|
|
|
workflowId = workflow.id if hasattr(workflow, 'id') else 'NO_ID'
|
|
workflowObjId = id(workflow)
|
|
logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
|
|
logger.debug(f"getChatDocumentsFromDocumentList: using workflow.id = {workflowId}, workflow object id = {workflowObjId}")
|
|
|
|
# Root cause analysis: Verify workflow.messages integrity and detect workflow changes
|
|
self._verifyWorkflowMessagesIntegrity(workflow, workflowId)
|
|
|
|
# Debug: list available messages with their labels and document names (filtered by workflowId)
|
|
try:
|
|
if workflow and hasattr(workflow, 'messages') and workflow.messages:
|
|
msgLines = []
|
|
messagesFromOtherWorkflows = []
|
|
for message in workflow.messages:
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
# Only include messages that belong to this workflow
|
|
if msgWorkflowId and msgWorkflowId != workflowId:
|
|
messagesFromOtherWorkflows.append(f"id={getattr(message, 'id', None)}, label={getattr(message, 'documentsLabel', None)}, workflowId={msgWorkflowId}")
|
|
continue
|
|
# Also skip messages without workflowId (shouldn't happen, but be safe)
|
|
if not msgWorkflowId:
|
|
messagesFromOtherWorkflows.append(f"id={getattr(message, 'id', None)}, label={getattr(message, 'documentsLabel', None)}, workflowId=Missing")
|
|
continue
|
|
|
|
label = getattr(message, 'documentsLabel', None)
|
|
docNames = []
|
|
if getattr(message, 'documents', None):
|
|
for doc in message.documents:
|
|
name = getattr(doc, 'fileName', None) or getattr(doc, 'documentName', None) or 'Unnamed'
|
|
docNames.append(name)
|
|
msgLines.append(
|
|
f"- id={getattr(message, 'id', None)}, label={label}, workflowId={msgWorkflowId}, docs={docNames}"
|
|
)
|
|
if msgLines:
|
|
logger.debug("getChatDocumentsFromDocumentList: available messages (filtered for workflow):\n" + "\n".join(msgLines))
|
|
if messagesFromOtherWorkflows:
|
|
logger.warning(f"getChatDocumentsFromDocumentList: Found {len(messagesFromOtherWorkflows)} messages from other workflows in workflow.messages list:\n" + "\n".join(messagesFromOtherWorkflows))
|
|
else:
|
|
logger.debug("getChatDocumentsFromDocumentList: no messages available on current workflow")
|
|
except Exception as e:
|
|
logger.debug(f"getChatDocumentsFromDocumentList: unable to enumerate messages for debug: {e}")
|
|
|
|
allDocuments = []
|
|
for docRef in documentList:
|
|
if docRef.startswith("docItem:"):
|
|
# docItem:<id>:<filename> - extract ID and find document
|
|
parts = docRef.split(':')
|
|
if len(parts) >= 2:
|
|
docId = parts[1]
|
|
# Find the document by ID
|
|
for message in workflow.messages:
|
|
# Validate message belongs to this workflow
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
if not msgWorkflowId or msgWorkflowId != workflowId:
|
|
continue
|
|
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == docId:
|
|
docName = getattr(doc, 'fileName', 'unknown')
|
|
allDocuments.append(doc)
|
|
break
|
|
elif docRef.startswith("docList:"):
|
|
# docList:<messageId>:<label> or docList:<label> - extract message ID and find document list
|
|
parts = docRef.split(':')
|
|
if len(parts) >= 3:
|
|
# Format: docList:<messageId>:<label>
|
|
messageId = parts[1]
|
|
label = parts[2]
|
|
# First try to find the message by ID in the current workflow
|
|
messageFound = None
|
|
for message in workflow.messages:
|
|
# Validate message belongs to this workflow
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
if not msgWorkflowId or msgWorkflowId != workflowId:
|
|
continue
|
|
|
|
if str(message.id) == messageId:
|
|
messageFound = message
|
|
break
|
|
|
|
# If message ID not found in current workflow, this is a stale reference
|
|
# Log warning and return empty list (don't fall back to label - it might match wrong message)
|
|
if not messageFound:
|
|
availableIds = [str(msg.id) for msg in workflow.messages]
|
|
logger.warning(f"Document reference contains stale message ID {messageId} not found in current workflow {workflow.id}. Label: {label}. Available message IDs: {availableIds}")
|
|
logger.warning(f"This indicates the document reference was created in a different workflow state. Returning empty list.")
|
|
# Return empty list - don't fall back to label matching which could match wrong message
|
|
continue
|
|
|
|
# If found, add documents
|
|
if messageFound and messageFound.documents:
|
|
allDocuments.extend(messageFound.documents)
|
|
elif len(parts) >= 2:
|
|
# Format: docList:<label> - find message by documentsLabel
|
|
label = parts[1]
|
|
messageFound = None
|
|
for message in workflow.messages:
|
|
# Validate message belongs to this workflow
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
if not msgWorkflowId or msgWorkflowId != workflowId:
|
|
if msgWorkflowId:
|
|
logger.warning(f"Message {message.id} has workflowId {msgWorkflowId} but belongs to workflow {workflowId}. Skipping.")
|
|
else:
|
|
logger.warning(f"Message {message.id} has no workflowId. Skipping.")
|
|
continue
|
|
|
|
msgLabel = getattr(message, 'documentsLabel', None)
|
|
if msgLabel == label:
|
|
messageFound = message
|
|
break
|
|
|
|
# If found, add documents
|
|
if messageFound and messageFound.documents:
|
|
allDocuments.extend(messageFound.documents)
|
|
else:
|
|
# Direct label reference - can be round1_task2_action3_contextinfo format or simple label
|
|
# Search for messages with matching documentsLabel to find the actual documents
|
|
matchingMessages = []
|
|
for message in workflow.messages:
|
|
# Validate message belongs to this workflow
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
if not msgWorkflowId or msgWorkflowId != workflowId:
|
|
if msgWorkflowId:
|
|
logger.debug(f"Skipping message {message.id} with workflowId {msgWorkflowId} (expected {workflowId})")
|
|
else:
|
|
logger.debug(f"Skipping message {message.id} with no workflowId (expected {workflowId})")
|
|
continue
|
|
|
|
msgDocumentsLabel = getattr(message, 'documentsLabel', '')
|
|
|
|
# Check if this message's documentsLabel matches our reference
|
|
if msgDocumentsLabel == docRef:
|
|
# Found a matching message, collect it for comparison
|
|
matchingMessages.append(message)
|
|
|
|
# If we found matching messages, take the newest one (highest publishedAt)
|
|
if matchingMessages:
|
|
# Sort by publishedAt descending (newest first)
|
|
matchingMessages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
|
|
newestMessage = matchingMessages[0]
|
|
|
|
if newestMessage.documents:
|
|
docNames = [doc.fileName for doc in newestMessage.documents if hasattr(doc, 'fileName')]
|
|
logger.debug(f"Added {len(newestMessage.documents)} documents from newest message {newestMessage.id}: {docNames}")
|
|
allDocuments.extend(newestMessage.documents)
|
|
else:
|
|
logger.debug(f"No documents found in newest message {newestMessage.id}")
|
|
else:
|
|
logger.error(f"No messages found with documentsLabel: {docRef}")
|
|
raise ValueError(f"Document reference not found: {docRef}")
|
|
|
|
logger.debug(f"Resolved {len(allDocuments)} documents from document list: {documentList}")
|
|
return allDocuments
|
|
except Exception as e:
|
|
logger.error(f"Error getting documents from document list: {str(e)}")
|
|
return []
|
|
|
|
def _verifyWorkflowMessagesIntegrity(self, workflow, expectedWorkflowId: str) -> None:
|
|
"""
|
|
Verify that all messages in workflow.messages belong to the expected workflow.
|
|
This helps detect when workflow objects are being mixed up or when messages from
|
|
other workflows are incorrectly included.
|
|
"""
|
|
try:
|
|
if not workflow or not hasattr(workflow, 'messages') or not workflow.messages:
|
|
return
|
|
|
|
messagesFromOtherWorkflows = []
|
|
messagesWithoutWorkflowId = []
|
|
totalMessages = len(workflow.messages)
|
|
|
|
for message in workflow.messages:
|
|
msgWorkflowId = getattr(message, 'workflowId', None)
|
|
if not msgWorkflowId:
|
|
messagesWithoutWorkflowId.append({
|
|
'id': getattr(message, 'id', 'unknown'),
|
|
'label': getattr(message, 'documentsLabel', None)
|
|
})
|
|
elif msgWorkflowId != expectedWorkflowId:
|
|
messagesFromOtherWorkflows.append({
|
|
'id': getattr(message, 'id', 'unknown'),
|
|
'label': getattr(message, 'documentsLabel', None),
|
|
'workflowId': msgWorkflowId,
|
|
'expectedWorkflowId': expectedWorkflowId
|
|
})
|
|
|
|
if messagesFromOtherWorkflows:
|
|
logger.error(
|
|
f"CRITICAL: Workflow integrity violation detected! "
|
|
f"Workflow {expectedWorkflowId} contains {len(messagesFromOtherWorkflows)} messages from other workflows. "
|
|
f"Total messages: {totalMessages}. "
|
|
f"Foreign messages: {messagesFromOtherWorkflows}"
|
|
)
|
|
|
|
if messagesWithoutWorkflowId:
|
|
logger.warning(
|
|
f"Workflow integrity issue: Workflow {expectedWorkflowId} contains {len(messagesWithoutWorkflowId)} messages without workflowId. "
|
|
f"Messages: {messagesWithoutWorkflowId}"
|
|
)
|
|
|
|
# Also check if self.services.workflow has changed (workflow object ID mismatch)
|
|
currentWorkflow = self.services.workflow
|
|
if currentWorkflow and hasattr(currentWorkflow, 'id'):
|
|
currentWorkflowId = currentWorkflow.id
|
|
if currentWorkflowId != expectedWorkflowId:
|
|
logger.error(
|
|
f"CRITICAL: Workflow object changed during execution! "
|
|
f"Expected workflow {expectedWorkflowId}, but self.services.workflow now points to {currentWorkflowId}. "
|
|
f"This indicates the workflow object was swapped mid-execution."
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error during workflow integrity verification: {e}")
|
|
|
|
def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str:
|
|
"""Get connection reference from UserConnection with enhanced state information"""
|
|
# Get token information to check if it's expired
|
|
token = None
|
|
token_status = "unknown"
|
|
try:
|
|
# Get a fresh token via TokenManager convenience method
|
|
logger.debug(f"Getting fresh token for connection {connection.id}")
|
|
token = TokenManager().getFreshToken(connection.id)
|
|
if token:
|
|
if hasattr(token, 'expiresAt') and token.expiresAt:
|
|
current_time = self.services.utils.timestampGetUtc()
|
|
if current_time > token.expiresAt:
|
|
token_status = "expired"
|
|
else:
|
|
# Check if this token was recently refreshed (within last 5 minutes)
|
|
time_since_creation = current_time - token.createdAt if hasattr(token, 'createdAt') else 0
|
|
if time_since_creation < 300: # 5 minutes
|
|
token_status = "valid (refreshed)"
|
|
else:
|
|
token_status = "valid"
|
|
else:
|
|
token_status = "no_expiration"
|
|
else:
|
|
token_status = "no_token"
|
|
except Exception as e:
|
|
token_status = f"error: {str(e)}"
|
|
|
|
# Build enhanced reference with state information
|
|
# Format: connection:msft:<username> (without UUID)
|
|
base_ref = f"connection:{connection.authority.value}:{connection.externalUsername}"
|
|
state_info = f" [status:{connection.status.value}, token:{token_status}]"
|
|
|
|
logger.debug(f"getConnectionReferenceFromUserConnection: Built reference: {base_ref + state_info}")
|
|
return base_ref + state_info
|
|
|
|
def getUserConnectionByExternalUsername(self, authority: str, externalUsername: str) -> Optional[UserConnection]:
|
|
"""Fetch the user's connection by authority and external username."""
|
|
try:
|
|
if not authority or not externalUsername:
|
|
return None
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
for connection in user_connections:
|
|
# Normalize authority for comparison (enum vs string)
|
|
connection_authority = connection.authority.value if hasattr(connection.authority, 'value') else str(connection.authority)
|
|
if connection_authority.lower() == authority.lower() and connection.externalUsername == externalUsername:
|
|
return connection
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection by external username: {str(e)}")
|
|
return None
|
|
|
|
def getUserConnectionFromConnectionReference(self, connectionReference: str) -> Optional[UserConnection]:
|
|
"""Get UserConnection from reference string (handles new format without UUID)"""
|
|
try:
|
|
# Parse reference format: connection:{authority}:{username} [status:..., token:...]
|
|
# Remove state information if present
|
|
base_reference = connectionReference.split(' [')[0]
|
|
|
|
parts = base_reference.split(':')
|
|
if len(parts) != 3 or parts[0] != "connection":
|
|
return None
|
|
|
|
authority = parts[1]
|
|
username = parts[2]
|
|
|
|
# Get user connections through AppObjects interface
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
|
|
# Find matching connection by authority and username (no UUID needed)
|
|
for conn in user_connections:
|
|
if conn.authority.value == authority and conn.externalUsername == username:
|
|
return conn
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing connection reference: {str(e)}")
|
|
return None
|
|
|
|
def getFreshConnectionToken(self, connectionId: str):
|
|
"""Get a fresh token for a specific connection (moved from UtilsService).
|
|
|
|
Args:
|
|
connectionId: ID of the connection to get token for
|
|
|
|
Returns:
|
|
Token object or None if not found/expired
|
|
"""
|
|
try:
|
|
return TokenManager().getFreshToken(connectionId)
|
|
except Exception as e:
|
|
logger.error(f"Error getting fresh token for connection {connectionId}: {str(e)}")
|
|
return None
|
|
|
|
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
|
|
"""Get file information"""
|
|
file_item = self.interfaceDbComponent.getFile(fileId)
|
|
if file_item:
|
|
return {
|
|
"id": file_item.id,
|
|
"fileName": file_item.fileName,
|
|
"size": file_item.fileSize,
|
|
"mimeType": file_item.mimeType,
|
|
"fileHash": file_item.fileHash,
|
|
"creationDate": file_item.creationDate
|
|
}
|
|
return None
|
|
|
|
def getFileData(self, fileId: str) -> bytes:
|
|
"""Get file data by ID"""
|
|
return self.interfaceDbComponent.getFileData(fileId)
|
|
|
|
def _diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]:
|
|
"""
|
|
Diagnose document access issues and provide recovery information.
|
|
This method helps identify why document properties are inaccessible.
|
|
"""
|
|
try:
|
|
diagnosis = {
|
|
'document_id': document.id,
|
|
'file_id': document.fileId,
|
|
'has_component_interface': document._componentInterface is not None,
|
|
'component_interface_type': type(document._componentInterface).__name__ if document._componentInterface else None,
|
|
'file_exists': False,
|
|
'file_info': None,
|
|
'error_details': None
|
|
}
|
|
|
|
# Check if component interface is set
|
|
if not document._componentInterface:
|
|
diagnosis['error_details'] = "Component interface not set - document cannot access file system"
|
|
return diagnosis
|
|
|
|
# Try to access the file directly
|
|
try:
|
|
file_info = self.interfaceDbComponent.getFile(document.fileId)
|
|
if file_info:
|
|
diagnosis['file_exists'] = True
|
|
diagnosis['file_info'] = {
|
|
'fileName': file_info.fileName if hasattr(file_info, 'fileName') else 'N/A',
|
|
'fileSize': file_info.fileSize if hasattr(file_info, 'fileSize') else 'N/A',
|
|
'mimeType': file_info.mimeType if hasattr(file_info, 'mimeType') else 'N/A'
|
|
}
|
|
else:
|
|
diagnosis['error_details'] = f"File with ID {document.fileId} not found in component interface"
|
|
except Exception as e:
|
|
diagnosis['error_details'] = f"Error accessing file {document.fileId}: {str(e)}"
|
|
|
|
return diagnosis
|
|
|
|
except Exception as e:
|
|
return {
|
|
'document_id': document.id if hasattr(document, 'id') else 'unknown',
|
|
'file_id': document.fileId if hasattr(document, 'fileId') else 'unknown',
|
|
'error_details': f"Error during diagnosis: {str(e)}"
|
|
}
|
|
|
|
def _recoverDocumentAccess(self, document: ChatDocument) -> bool:
|
|
"""
|
|
Attempt to recover document access by re-setting the component interface.
|
|
Returns True if recovery was successful.
|
|
"""
|
|
try:
|
|
logger.info(f"Attempting to recover document access for document {document.id}")
|
|
|
|
# Re-set the component interface
|
|
document.setComponentInterface(self.interfaceDbComponent)
|
|
|
|
# Test if we can now access the fileName
|
|
try:
|
|
test_fileName = document.fileName
|
|
logger.info(f"Document access recovered for {document.id} -> {test_fileName}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Document access recovery failed for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during document access recovery for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
def calculateObjectSize(self, obj: Any) -> int:
|
|
"""
|
|
Calculate the size of an object in bytes.
|
|
|
|
Args:
|
|
obj: Object to calculate size for
|
|
|
|
Returns:
|
|
int: Size in bytes
|
|
"""
|
|
try:
|
|
import json
|
|
import sys
|
|
|
|
if obj is None:
|
|
return 0
|
|
|
|
# Convert object to JSON string and calculate size
|
|
json_str = json.dumps(obj, ensure_ascii=False, default=str)
|
|
return len(json_str.encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating object size: {str(e)}")
|
|
return 0
|
|
|
|
def getWorkflowContext(self) -> Dict[str, int]:
|
|
"""Get current workflow context for document generation"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
|
|
return {
|
|
'currentRound': workflow.currentRound if hasattr(workflow, 'currentRound') else 0,
|
|
'currentTask': workflow.currentTask if hasattr(workflow, 'currentTask') else 0,
|
|
'currentAction': workflow.currentAction if hasattr(workflow, 'currentAction') else 0
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow context: {str(e)}")
|
|
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
|
|
|
|
def setWorkflowContext(self, roundNumber: int = None, taskNumber: int = None, actionNumber: int = None):
|
|
"""Set current workflow context for document generation and routing"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
logger.error("setWorkflowContext: No workflow available")
|
|
return
|
|
|
|
# Prepare update data
|
|
update_data = {}
|
|
|
|
if roundNumber is not None:
|
|
workflow.currentRound = roundNumber
|
|
update_data["currentRound"] = roundNumber
|
|
if taskNumber is not None:
|
|
workflow.currentTask = taskNumber
|
|
update_data["currentTask"] = taskNumber
|
|
if actionNumber is not None:
|
|
workflow.currentAction = actionNumber
|
|
update_data["currentAction"] = actionNumber
|
|
|
|
# Persist changes to database if any updates were made
|
|
if update_data:
|
|
self.interfaceDbChat.updateWorkflow(workflow.id, update_data)
|
|
|
|
logger.debug(f"Updated workflow context: Round {workflow.currentRound if hasattr(workflow, 'currentRound') else 'N/A'}, Task {workflow.currentTask if hasattr(workflow, 'currentTask') else 'N/A'}, Action {workflow.currentAction if hasattr(workflow, 'currentAction') else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Error setting workflow context: {str(e)}")
|
|
|
|
def getWorkflowStats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive workflow statistics including current context"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
workflow_context = self.getWorkflowContext()
|
|
if not workflow:
|
|
return {
|
|
'currentRound': workflow_context['currentRound'],
|
|
'currentTask': workflow_context['currentTask'],
|
|
'currentAction': workflow_context['currentAction'],
|
|
'totalTasks': 0,
|
|
'totalActions': 0,
|
|
'workflowStatus': 'unknown',
|
|
'workflowId': 'unknown'
|
|
}
|
|
return {
|
|
'currentRound': workflow_context['currentRound'],
|
|
'currentTask': workflow_context['currentTask'],
|
|
'currentAction': workflow_context['currentAction'],
|
|
'totalTasks': workflow.totalTasks if hasattr(workflow, 'totalTasks') else 0,
|
|
'totalActions': workflow.totalActions if hasattr(workflow, 'totalActions') else 0,
|
|
'workflowStatus': workflow.status if hasattr(workflow, 'status') else 'unknown',
|
|
'workflowId': workflow.id if hasattr(workflow, 'id') else 'unknown'
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow stats: {str(e)}")
|
|
return {
|
|
'currentRound': 0,
|
|
'currentTask': 0,
|
|
'currentAction': 0,
|
|
'totalTasks': 0,
|
|
'totalActions': 0,
|
|
'workflowStatus': 'unknown',
|
|
'workflowId': 'unknown'
|
|
}
|
|
|
|
def createWorkflow(self, workflowData: Dict[str, Any]):
|
|
"""Create a new workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.createWorkflow(workflowData)
|
|
except Exception as e:
|
|
logger.error(f"Error creating workflow: {str(e)}")
|
|
raise
|
|
|
|
def updateWorkflow(self, workflowId: str, updateData: Dict[str, Any]):
|
|
"""Update workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateWorkflow(workflowId, updateData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow: {str(e)}")
|
|
raise
|
|
|
|
def getWorkflow(self, workflowId: str):
|
|
"""Get workflow by ID by delegating to the chat interface"""
|
|
try:
|
|
logger.debug(f"getWorkflow called with workflowId: {workflowId}")
|
|
result = self.interfaceDbChat.getWorkflow(workflowId)
|
|
if result:
|
|
logger.debug(f"getWorkflow returned workflow with ID: {result.id}")
|
|
else:
|
|
logger.warning(f"getWorkflow returned None for workflowId: {workflowId}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow: {str(e)}")
|
|
raise
|
|
|
|
# === Service-level transactions (DB write-through + in-memory sync) ===
|
|
|
|
def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]) -> ChatMessage:
|
|
"""Persist message and documents, then bind them into in-memory workflow (replace-by-id)."""
|
|
# Ensure workflowId on message
|
|
messageData = dict(messageData or {})
|
|
messageData["workflowId"] = workflow.id
|
|
# Attach documents to message creation via interface (it persists message then docs)
|
|
messageDataWithDocs = dict(messageData)
|
|
messageDataWithDocs["documents"] = documents or []
|
|
chatInterface = self.interfaceDbChat
|
|
chatMessage = chatInterface.createMessage(messageDataWithDocs)
|
|
if not chatMessage:
|
|
raise ValueError("Failed to create message with documents")
|
|
# In-memory sync: replace or append
|
|
# replace-by-id if exists
|
|
replaced = False
|
|
for i, m in enumerate(workflow.messages or []):
|
|
if getattr(m, 'id', None) == getattr(chatMessage, 'id', None):
|
|
workflow.messages[i] = chatMessage
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
workflow.messages.append(chatMessage)
|
|
return chatMessage
|
|
|
|
def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> ChatLog:
|
|
"""Persist ChatLog and map it into the in-memory workflow logs list."""
|
|
logData = dict(logData or {})
|
|
logData["workflowId"] = workflow.id
|
|
chatInterface = self.interfaceDbChat
|
|
chatLog = chatInterface.createLog(logData)
|
|
if not chatLog:
|
|
raise ValueError("Failed to create log")
|
|
# replace-by-id if exists
|
|
replaced = False
|
|
for i, lg in enumerate(workflow.logs):
|
|
if getattr(lg, 'id', None) == getattr(chatLog, 'id', None):
|
|
workflow.logs[i] = chatLog
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
workflow.logs.append(chatLog)
|
|
return chatLog
|
|
|
|
def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat:
|
|
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list."""
|
|
try:
|
|
# Create ChatStat from AiCallResponse data
|
|
statData = {
|
|
"workflowId": workflow.id,
|
|
"process": process,
|
|
"engine": aiResponse.modelName,
|
|
"priceUsd": aiResponse.priceUsd,
|
|
"processingTime": aiResponse.processingTime,
|
|
"bytesSent": aiResponse.bytesSent,
|
|
"bytesReceived": aiResponse.bytesReceived,
|
|
"errorCount": aiResponse.errorCount
|
|
}
|
|
|
|
# Create the stat record in the database
|
|
stat = self.interfaceDbChat.createStat(statData)
|
|
|
|
# Append to workflow stats list in memory
|
|
if not hasattr(workflow, 'stats') or workflow.stats is None:
|
|
workflow.stats = []
|
|
workflow.stats.append(stat)
|
|
|
|
return stat
|
|
except Exception as e:
|
|
logger.error(f"Failed to store workflow stat: {e}")
|
|
raise
|
|
|
|
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
|
|
"""Update message by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateMessage(messageId, messageData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating message: {str(e)}")
|
|
raise
|
|
|
|
def getDocumentCount(self) -> str:
|
|
"""Get document count for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
return "No documents available"
|
|
|
|
# Count documents from all messages in the workflow (like old system)
|
|
total_docs = 0
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
total_docs += len(message.documents)
|
|
|
|
if total_docs == 0:
|
|
return "No documents available"
|
|
|
|
return f"{total_docs} document(s) available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting document count: {str(e)}")
|
|
return "No documents available"
|
|
|
|
def getWorkflowHistoryContext(self) -> str:
|
|
"""Get workflow history context for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
return "No previous round context available"
|
|
|
|
# Check if there are any previous rounds by looking for "first" messages
|
|
has_previous_rounds = False
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'status') and message.status == "first":
|
|
has_previous_rounds = True
|
|
break
|
|
|
|
if not has_previous_rounds:
|
|
return "No previous round context available"
|
|
|
|
# Get document reference list to show what documents are available from previous rounds
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Build context string showing previous rounds
|
|
context = "Previous workflow rounds contain documents:\n"
|
|
|
|
# Show history exchanges (previous rounds)
|
|
if document_list["history"]:
|
|
for exchange in document_list["history"]:
|
|
# Use label-only format to avoid stale message ID references
|
|
# Labels are stable identifiers that persist across workflow state changes
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} ({len(exchange['documents'])} documents)\n"
|
|
else:
|
|
context = "No previous round context available"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow history context: {str(e)}")
|
|
return "No previous round context available"
|
|
|
|
def getAvailableDocuments(self, workflow) -> str:
|
|
"""Get available documents formatted for AI prompts (exact copy of old ServiceCenter.getEnhancedDocumentContext)"""
|
|
try:
|
|
if not workflow or not hasattr(workflow, 'messages'):
|
|
return "No documents available"
|
|
|
|
workflowId = workflow.id if hasattr(workflow, 'id') else 'NO_ID'
|
|
workflowObjId = id(workflow)
|
|
logger.debug(f"getAvailableDocuments: workflow.id = {workflowId}, workflow object id = {workflowObjId}")
|
|
|
|
# Root cause analysis: Verify workflow.messages integrity and detect workflow changes
|
|
self._verifyWorkflowMessagesIntegrity(workflow, workflowId)
|
|
|
|
# Use the provided workflow object directly to avoid database reload issues
|
|
# that can cause filename truncation. The workflow object should already be up-to-date.
|
|
|
|
# Get document reference list using the exact same logic as old system
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Timestamp-only available documents index dump removed
|
|
|
|
# Build index string for AI action planning
|
|
context = ""
|
|
|
|
# Process current round exchanges first
|
|
if document_list["chat"]:
|
|
context += "\nCurrent round documents:\n"
|
|
for exchange in document_list["chat"]:
|
|
# Use label-only format to avoid stale message ID references
|
|
# Labels are stable identifiers that persist across workflow state changes
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
# Process previous rounds after
|
|
if document_list["history"]:
|
|
context += "\nPast rounds documents:\n"
|
|
for exchange in document_list["history"]:
|
|
# Use label-only format to avoid stale message ID references
|
|
# Labels are stable identifiers that persist across workflow state changes
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
if not document_list["chat"] and not document_list["history"]:
|
|
context += "\nNO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting available documents: {str(e)}")
|
|
return "NO DOCUMENTS AVAILABLE - Error generating document context."
|
|
|
|
def _getDocumentReferenceList(self, workflow) -> Dict[str, List]:
|
|
"""Get list of document exchanges with new labeling format, sorted by recency (exact copy of old system)"""
|
|
# Collect all documents first and refresh their attributes
|
|
all_documents = []
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
all_documents.extend(message.documents)
|
|
|
|
# Refresh file attributes for all documents
|
|
if all_documents:
|
|
self._refreshDocumentFileAttributes(all_documents)
|
|
|
|
def _is_valid_document(doc) -> bool:
|
|
try:
|
|
size_ok = getattr(doc, 'fileSize', 0) and getattr(doc, 'fileSize', 0) > 0
|
|
id_ok = bool(getattr(doc, 'fileId', None))
|
|
mime_ok = bool(getattr(doc, 'mimeType', None))
|
|
return size_ok and id_ok and mime_ok
|
|
except Exception:
|
|
return False
|
|
|
|
# Simplified, deterministic logic:
|
|
# - Walk messages newest-first
|
|
# - For each document, assign it exactly once to a bucket based on the message round
|
|
# - Never allow the same doc to appear in both buckets
|
|
chat_exchanges = []
|
|
history_exchanges = []
|
|
seen_doc_ids = set()
|
|
current_round = getattr(workflow, 'currentRound', None)
|
|
|
|
for message in reversed(workflow.messages):
|
|
if not getattr(message, 'documents', None):
|
|
continue
|
|
|
|
label = getattr(message, 'documentsLabel', None)
|
|
if not label:
|
|
# Skip messages without a label to keep references consistent
|
|
continue
|
|
|
|
doc_refs = []
|
|
for doc in message.documents:
|
|
if not _is_valid_document(doc):
|
|
continue
|
|
# Avoid duplicates across chat/history
|
|
doc_id = getattr(doc, 'id', None)
|
|
if not doc_id or doc_id in seen_doc_ids:
|
|
continue
|
|
seen_doc_ids.add(doc_id)
|
|
doc_ref = self.getDocumentReferenceFromChatDocument(doc)
|
|
doc_refs.append(doc_ref)
|
|
|
|
if not doc_refs:
|
|
continue
|
|
|
|
entry = {
|
|
'documentsLabel': label,
|
|
'documents': doc_refs
|
|
}
|
|
|
|
msg_round = getattr(message, 'roundNumber', None)
|
|
if current_round is not None and msg_round == current_round:
|
|
chat_exchanges.append(entry)
|
|
else:
|
|
history_exchanges.append(entry)
|
|
|
|
return {
|
|
"chat": chat_exchanges,
|
|
"history": history_exchanges
|
|
}
|
|
|
|
def _refreshDocumentFileAttributes(self, documents) -> None:
|
|
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
|
|
for doc in documents:
|
|
try:
|
|
original_filename = doc.fileName
|
|
# Skip invalid docs early if essential identifiers are missing
|
|
if not getattr(doc, 'fileId', None):
|
|
logger.debug(f"Skipping document {doc.id} due to missing fileId")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
continue
|
|
|
|
file_info = self.getFileInfo(doc.fileId)
|
|
if file_info:
|
|
db_filename = file_info.get("fileName", doc.fileName)
|
|
doc.fileName = file_info.get("fileName", doc.fileName)
|
|
doc.fileSize = file_info.get("size", doc.fileSize)
|
|
doc.mimeType = file_info.get("mimeType", doc.mimeType)
|
|
|
|
# Mark invalid if missing mimeType
|
|
if not doc.mimeType:
|
|
logger.debug(f"Document {doc.id} has missing mimeType; will be filtered from index")
|
|
setattr(doc, 'fileSize', 0)
|
|
|
|
else:
|
|
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
|
|
|
|
def _generateWorkflowContextPrefix(self, message) -> str:
|
|
"""Generate workflow context prefix: round{num}_task{num}_action{num}"""
|
|
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
|
|
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
|
|
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
|
|
return f"round{round_num}_task{task_num}_action{action_num}"
|
|
|
|
def getDocumentReferenceFromChatDocument(self, document) -> str:
|
|
"""Get document reference using document ID and filename."""
|
|
try:
|
|
# Use document ID and filename for simple reference
|
|
return f"docItem:{document.id}:{document.fileName}"
|
|
except Exception as e:
|
|
logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
|
|
# Re-raise the error to prevent workflow from continuing with invalid data
|
|
raise
|
|
|
|
def _getMessageSequenceForExchange(self, exchange, workflow) -> int:
|
|
"""Get message sequence number for sorting exchanges by recency"""
|
|
try:
|
|
# Extract message ID from the first document reference
|
|
if exchange['documents'] and len(exchange['documents']) > 0:
|
|
first_doc_ref = exchange['documents'][0]
|
|
if first_doc_ref.startswith("docItem:"):
|
|
# docItem:<id>:<label> - extract ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
doc_id = parts[1]
|
|
# Find the message containing this document
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == doc_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
elif first_doc_ref.startswith("docList:"):
|
|
# docList:<message_id>:<label> - extract message ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
message_id = parts[1]
|
|
# Find the message by ID
|
|
for message in workflow.messages:
|
|
if str(message.id) == message_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting message sequence for exchange: {str(e)}")
|
|
return 0
|
|
|
|
def _validateDocumentLabelConsistency(self, message) -> str:
|
|
"""Validate that the document label used for references matches the message's actual label"""
|
|
if not hasattr(message, 'documentsLabel') or not message.documentsLabel:
|
|
return None
|
|
|
|
# Simply return the message's actual documentsLabel - no correction, just validation
|
|
return message.documentsLabel
|
|
|
|
def getConnectionReferenceList(self) -> List[str]:
|
|
"""Get connection reference list (matching old handlingTasks.py logic)"""
|
|
try:
|
|
# Get connections from the database using the same logic as the old system
|
|
if hasattr(self.services, 'interfaceDbApp') and hasattr(self.services, 'user'):
|
|
userId = self.services.user.id
|
|
connections = self.services.interfaceDbApp.getUserConnections(userId)
|
|
if connections:
|
|
# Format connections as reference strings using the same pattern as the old system
|
|
connectionRefs = []
|
|
for conn in connections:
|
|
# Create reference string in format: connection:{authority}:{username} [status:..., token:...]
|
|
# This matches the format expected by getUserConnectionFromConnectionReference()
|
|
ref = self.getConnectionReferenceFromUserConnection(conn)
|
|
connectionRefs.append(ref)
|
|
return connectionRefs
|
|
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection reference list: {str(e)}")
|
|
return []
|
|
|
|
|
|
def _getProgressLogger(self):
|
|
"""Get or create the progress logger instance"""
|
|
if self._progressLogger is None:
|
|
self._progressLogger = ProgressLogger(self.services)
|
|
return self._progressLogger
|
|
|
|
def createProgressLogger(self) -> ProgressLogger:
|
|
return ProgressLogger(self.services)
|
|
|
|
def progressLogStart(self, operationId: str, serviceName: str, actionName: str, context: str = ""):
|
|
"""Wrapper for ProgressLogger.startOperation"""
|
|
progressLogger = self._getProgressLogger()
|
|
return progressLogger.startOperation(operationId, serviceName, actionName, context)
|
|
|
|
def progressLogUpdate(self, operationId: str, progress: float, statusUpdate: str = ""):
|
|
"""Wrapper for ProgressLogger.updateOperation"""
|
|
progressLogger = self._getProgressLogger()
|
|
return progressLogger.updateOperation(operationId, progress, statusUpdate)
|
|
|
|
def progressLogFinish(self, operationId: str, success: bool = True):
|
|
"""Wrapper for ProgressLogger.finishOperation"""
|
|
progressLogger = self._getProgressLogger()
|
|
return progressLogger.finishOperation(operationId, success)
|
|
|