909 lines
No EOL
47 KiB
Python
909 lines
No EOL
47 KiB
Python
import logging
|
|
import uuid
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelUam import User, UserConnection
|
|
from modules.datamodels.datamodelChat import ChatDocument, ChatMessage
|
|
from modules.datamodels.datamodelChat import ChatContentExtracted
|
|
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
|
|
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
from modules.security.tokenManager import TokenManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class WorkflowService:
|
|
"""Service class containing methods for document processing, chat operations, and workflow management"""
|
|
|
|
def __init__(self, serviceCenter):
|
|
self.services = serviceCenter
|
|
self.user = serviceCenter.user
|
|
self.workflow = serviceCenter.workflow
|
|
self.interfaceDbChat = serviceCenter.interfaceDbChat
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
self.interfaceDbApp = serviceCenter.interfaceDbApp
|
|
|
|
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
|
|
"""
|
|
Summarize chat messages from last to first message with status="first"
|
|
|
|
Args:
|
|
messages: List of chat messages to summarize
|
|
|
|
Returns:
|
|
str: Summary of the chat in user's language
|
|
"""
|
|
try:
|
|
# Get messages from last to first, stopping at first message with status="first"
|
|
relevantMessages = []
|
|
for msg in reversed(messages):
|
|
relevantMessages.append(msg)
|
|
if msg.status == "first":
|
|
break
|
|
|
|
# Create prompt for AI
|
|
prompt = f"""You are an AI assistant providing a summary of a chat conversation.
|
|
Please respond in '{self.user.language}' language.
|
|
|
|
Chat History:
|
|
{chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))}
|
|
|
|
Instructions:
|
|
1. Summarize the conversation's key points and outcomes
|
|
2. Be concise but informative
|
|
3. Use a professional but friendly tone
|
|
4. Focus on important decisions and next steps if any
|
|
|
|
Please provide a comprehensive summary of this conversation."""
|
|
|
|
# Get summary using AI service directly (avoiding circular dependency)
|
|
ai_service = AiService(self)
|
|
return await ai_service.callAi(
|
|
prompt=prompt,
|
|
documents=None,
|
|
options={
|
|
"process_type": "text",
|
|
"operation_type": "generate_content",
|
|
"priority": "speed",
|
|
"compress_prompt": True,
|
|
"compress_documents": False,
|
|
"max_cost": 0.01
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error summarizing chat: {str(e)}")
|
|
return f"Error summarizing chat: {str(e)}"
|
|
|
|
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
|
|
"""Get ChatDocuments from a list of document references using all three formats."""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}")
|
|
|
|
all_documents = []
|
|
for doc_ref in documentList:
|
|
if doc_ref.startswith("docItem:"):
|
|
# docItem:<id>:<filename> - extract ID and find document
|
|
parts = doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
doc_id = parts[1]
|
|
# Find the document by ID
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == doc_id:
|
|
doc_name = getattr(doc, 'fileName', 'unknown')
|
|
logger.debug(f"Found docItem reference {doc_ref}: {doc_name}")
|
|
all_documents.append(doc)
|
|
break
|
|
elif doc_ref.startswith("docList:"):
|
|
# docList:<messageId>:<label> or docList:<label> - extract message ID and find document list
|
|
parts = doc_ref.split(':')
|
|
if len(parts) >= 3:
|
|
# Format: docList:<messageId>:<label>
|
|
message_id = parts[1]
|
|
label = parts[2]
|
|
logger.debug(f"Looking for message with ID: {message_id} and label: {label}")
|
|
|
|
# Find the message by ID and get all its documents
|
|
message_found = False
|
|
for message in workflow.messages:
|
|
logger.debug(f"Checking message ID: {message.id} (looking for: {message_id})")
|
|
if str(message.id) == message_id:
|
|
message_found = True
|
|
logger.debug(f"Found message {message.id} with documentsLabel: {getattr(message, 'documentsLabel', 'None')}")
|
|
if message.documents:
|
|
doc_names = [doc.fileName for doc in message.documents if hasattr(doc, 'fileName')]
|
|
logger.debug(f"Found docList reference {doc_ref}: {len(message.documents)} documents - {doc_names}")
|
|
all_documents.extend(message.documents)
|
|
else:
|
|
logger.debug(f"Found docList reference {doc_ref} but message has no documents")
|
|
break
|
|
|
|
if not message_found:
|
|
available_ids = [str(msg.id) for msg in workflow.messages]
|
|
logger.error(f"Message with ID {message_id} not found in workflow. Available message IDs: {available_ids}")
|
|
raise ValueError(f"Document reference not found: docList:{message_id}:{label}")
|
|
elif len(parts) >= 2:
|
|
# Format: docList:<label> - find message by documentsLabel
|
|
label = parts[1]
|
|
logger.debug(f"Looking for message with documentsLabel: {label}")
|
|
# Find messages with matching documentsLabel
|
|
matching_messages = []
|
|
for message in workflow.messages:
|
|
# Check both attribute and raw data for documentsLabel
|
|
msg_label = getattr(message, 'documentsLabel', None)
|
|
if msg_label == label:
|
|
matching_messages.append(message)
|
|
logger.debug(f"Found message {message.id} with matching documentsLabel: {msg_label}")
|
|
else:
|
|
# Debug: show what labels we're comparing
|
|
logger.debug(f"Message {message.id} has documentsLabel: '{msg_label}' (looking for: '{label}')")
|
|
|
|
if matching_messages:
|
|
# Use the newest message (highest publishedAt)
|
|
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
|
|
newest_message = matching_messages[0]
|
|
|
|
if newest_message.documents:
|
|
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
|
|
logger.debug(f"Found docList reference {doc_ref}: {len(newest_message.documents)} documents - {doc_names}")
|
|
all_documents.extend(newest_message.documents)
|
|
else:
|
|
logger.debug(f"Found docList reference {doc_ref} but message has no documents")
|
|
else:
|
|
logger.error(f"No messages found with documentsLabel: {label}")
|
|
raise ValueError(f"Document reference not found: docList:{label}")
|
|
else:
|
|
# Direct label reference (round1_task2_action3_contextinfo)
|
|
# Search for messages with matching documentsLabel to find the actual documents
|
|
if doc_ref.startswith("round"):
|
|
# Parse round/task/action to find the corresponding document list
|
|
label_parts = doc_ref.split('_', 3)
|
|
if len(label_parts) >= 4:
|
|
round_num = int(label_parts[0].replace('round', ''))
|
|
task_num = int(label_parts[1].replace('task', ''))
|
|
action_num = int(label_parts[2].replace('action', ''))
|
|
context_info = label_parts[3]
|
|
|
|
logger.debug(f"Resolving round reference: round{round_num}_task{task_num}_action{action_num}_{context_info}")
|
|
logger.debug(f"Looking for messages with documentsLabel matching: {doc_ref}")
|
|
|
|
# Find messages with matching documentsLabel (this is the correct way!)
|
|
# In case of retries, we want the NEWEST message (most recent publishedAt)
|
|
matching_messages = []
|
|
for message in workflow.messages:
|
|
msg_documents_label = getattr(message, 'documentsLabel', '')
|
|
|
|
# Check if this message's documentsLabel matches our reference
|
|
if msg_documents_label == doc_ref:
|
|
# Found a matching message, collect it for comparison
|
|
matching_messages.append(message)
|
|
logger.debug(f"Found message {message.id} with matching documentsLabel: {msg_documents_label}")
|
|
|
|
# If we found matching messages, take the newest one (highest publishedAt)
|
|
if matching_messages:
|
|
# Sort by publishedAt descending (newest first)
|
|
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
|
|
newest_message = matching_messages[0]
|
|
|
|
logger.debug(f"Found {len(matching_messages)} matching messages, using newest: {newest_message.id} (publishedAt: {getattr(newest_message, 'publishedAt', 'unknown')})")
|
|
logger.debug(f"Newest message has {len(newest_message.documents) if newest_message.documents else 0} documents")
|
|
|
|
if newest_message.documents:
|
|
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
|
|
logger.debug(f"Added {len(newest_message.documents)} documents from newest message {newest_message.id}: {doc_names}")
|
|
all_documents.extend(newest_message.documents)
|
|
else:
|
|
logger.debug(f"No documents found in newest message {newest_message.id}")
|
|
else:
|
|
logger.error(f"No messages found with documentsLabel: {doc_ref}")
|
|
raise ValueError(f"Document reference not found: {doc_ref}")
|
|
|
|
logger.debug(f"Resolved {len(all_documents)} documents from document list: {documentList}")
|
|
return all_documents
|
|
except Exception as e:
|
|
logger.error(f"Error getting documents from document list: {str(e)}")
|
|
return []
|
|
|
|
def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str:
|
|
"""Get connection reference from UserConnection with enhanced state information"""
|
|
# Get token information to check if it's expired
|
|
token = None
|
|
token_status = "unknown"
|
|
try:
|
|
# Get a fresh token via TokenManager convenience method
|
|
logger.debug(f"Getting fresh token for connection {connection.id}")
|
|
token = TokenManager().getFreshToken(connection.id)
|
|
if token:
|
|
if hasattr(token, 'expiresAt') and token.expiresAt:
|
|
current_time = get_utc_timestamp()
|
|
logger.debug(f"getConnectionReferenceFromUserConnection: Current time: {current_time}")
|
|
logger.debug(f"getConnectionReferenceFromUserConnection: Token expires at: {token.expiresAt}")
|
|
if current_time > token.expiresAt:
|
|
token_status = "expired"
|
|
else:
|
|
# Check if this token was recently refreshed (within last 5 minutes)
|
|
time_since_creation = current_time - token.createdAt if hasattr(token, 'createdAt') else 0
|
|
if time_since_creation < 300: # 5 minutes
|
|
token_status = "valid (refreshed)"
|
|
else:
|
|
token_status = "valid"
|
|
else:
|
|
token_status = "no_expiration"
|
|
else:
|
|
token_status = "no_token"
|
|
except Exception as e:
|
|
token_status = f"error: {str(e)}"
|
|
|
|
# Build enhanced reference with state information
|
|
# Format: connection:msft:<username> (without UUID)
|
|
base_ref = f"connection:{connection.authority.value}:{connection.externalUsername}"
|
|
state_info = f" [status:{connection.status.value}, token:{token_status}]"
|
|
|
|
logger.debug(f"getConnectionReferenceFromUserConnection: Built reference: {base_ref + state_info}")
|
|
return base_ref + state_info
|
|
|
|
def getUserConnectionByExternalUsername(self, authority: str, externalUsername: str) -> Optional[UserConnection]:
|
|
"""Fetch the user's connection by authority and external username."""
|
|
try:
|
|
if not authority or not externalUsername:
|
|
return None
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
for connection in user_connections:
|
|
# Normalize authority for comparison (enum vs string)
|
|
connection_authority = connection.authority.value if hasattr(connection.authority, 'value') else str(connection.authority)
|
|
if connection_authority.lower() == authority.lower() and connection.externalUsername == externalUsername:
|
|
return connection
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection by external username: {str(e)}")
|
|
return None
|
|
|
|
def getUserConnectionFromConnectionReference(self, connectionReference: str) -> Optional[UserConnection]:
|
|
"""Get UserConnection from reference string (handles new format without UUID)"""
|
|
try:
|
|
# Parse reference format: connection:{authority}:{username} [status:..., token:...]
|
|
# Remove state information if present
|
|
base_reference = connectionReference.split(' [')[0]
|
|
|
|
parts = base_reference.split(':')
|
|
if len(parts) != 3 or parts[0] != "connection":
|
|
return None
|
|
|
|
authority = parts[1]
|
|
username = parts[2]
|
|
|
|
# Get user connections through AppObjects interface
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
|
|
# Find matching connection by authority and username (no UUID needed)
|
|
for conn in user_connections:
|
|
if conn.authority.value == authority and conn.externalUsername == username:
|
|
return conn
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing connection reference: {str(e)}")
|
|
return None
|
|
|
|
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
|
|
"""Get file information"""
|
|
file_item = self.interfaceDbComponent.getFile(fileId)
|
|
if file_item:
|
|
return {
|
|
"id": file_item.id,
|
|
"fileName": file_item.fileName,
|
|
"size": file_item.fileSize,
|
|
"mimeType": file_item.mimeType,
|
|
"fileHash": file_item.fileHash,
|
|
"creationDate": file_item.creationDate
|
|
}
|
|
return None
|
|
|
|
def getFileData(self, fileId: str) -> bytes:
|
|
"""Get file data by ID"""
|
|
return self.interfaceDbComponent.getFileData(fileId)
|
|
|
|
def _diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]:
|
|
"""
|
|
Diagnose document access issues and provide recovery information.
|
|
This method helps identify why document properties are inaccessible.
|
|
"""
|
|
try:
|
|
diagnosis = {
|
|
'document_id': document.id,
|
|
'file_id': document.fileId,
|
|
'has_component_interface': document._componentInterface is not None,
|
|
'component_interface_type': type(document._componentInterface).__name__ if document._componentInterface else None,
|
|
'file_exists': False,
|
|
'file_info': None,
|
|
'error_details': None
|
|
}
|
|
|
|
# Check if component interface is set
|
|
if not document._componentInterface:
|
|
diagnosis['error_details'] = "Component interface not set - document cannot access file system"
|
|
return diagnosis
|
|
|
|
# Try to access the file directly
|
|
try:
|
|
file_info = self.interfaceDbComponent.getFile(document.fileId)
|
|
if file_info:
|
|
diagnosis['file_exists'] = True
|
|
diagnosis['file_info'] = {
|
|
'fileName': file_info.fileName if hasattr(file_info, 'fileName') else 'N/A',
|
|
'fileSize': file_info.fileSize if hasattr(file_info, 'fileSize') else 'N/A',
|
|
'mimeType': file_info.mimeType if hasattr(file_info, 'mimeType') else 'N/A'
|
|
}
|
|
else:
|
|
diagnosis['error_details'] = f"File with ID {document.fileId} not found in component interface"
|
|
except Exception as e:
|
|
diagnosis['error_details'] = f"Error accessing file {document.fileId}: {str(e)}"
|
|
|
|
return diagnosis
|
|
|
|
except Exception as e:
|
|
return {
|
|
'document_id': document.id if hasattr(document, 'id') else 'unknown',
|
|
'file_id': document.fileId if hasattr(document, 'fileId') else 'unknown',
|
|
'error_details': f"Error during diagnosis: {str(e)}"
|
|
}
|
|
|
|
def _recoverDocumentAccess(self, document: ChatDocument) -> bool:
|
|
"""
|
|
Attempt to recover document access by re-setting the component interface.
|
|
Returns True if recovery was successful.
|
|
"""
|
|
try:
|
|
logger.info(f"Attempting to recover document access for document {document.id}")
|
|
|
|
# Re-set the component interface
|
|
document.setComponentInterface(self.interfaceDbComponent)
|
|
|
|
# Test if we can now access the fileName
|
|
try:
|
|
test_fileName = document.fileName
|
|
logger.info(f"Document access recovered for {document.id} -> {test_fileName}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Document access recovery failed for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during document access recovery for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
def calculateObjectSize(self, obj: Any) -> int:
|
|
"""
|
|
Calculate the size of an object in bytes.
|
|
|
|
Args:
|
|
obj: Object to calculate size for
|
|
|
|
Returns:
|
|
int: Size in bytes
|
|
"""
|
|
try:
|
|
import json
|
|
import sys
|
|
|
|
if obj is None:
|
|
return 0
|
|
|
|
# Convert object to JSON string and calculate size
|
|
json_str = json.dumps(obj, ensure_ascii=False, default=str)
|
|
return len(json_str.encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating object size: {str(e)}")
|
|
return 0
|
|
|
|
def getWorkflowContext(self) -> Dict[str, int]:
|
|
"""Get current workflow context for document generation"""
|
|
try:
|
|
return {
|
|
'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0,
|
|
'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0,
|
|
'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow context: {str(e)}")
|
|
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
|
|
|
|
def setWorkflowContext(self, round_number: int = None, task_number: int = None, action_number: int = None):
|
|
"""Set current workflow context for document generation and routing"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Prepare update data
|
|
update_data = {}
|
|
|
|
if round_number is not None:
|
|
workflow.currentRound = round_number
|
|
update_data["currentRound"] = round_number
|
|
if task_number is not None:
|
|
workflow.currentTask = task_number
|
|
update_data["currentTask"] = task_number
|
|
if action_number is not None:
|
|
workflow.currentAction = action_number
|
|
update_data["currentAction"] = action_number
|
|
|
|
# Persist changes to database if any updates were made
|
|
if update_data:
|
|
self.interfaceDbChat.updateWorkflow(workflow.id, update_data)
|
|
|
|
logger.debug(f"Updated workflow context: Round {workflow.currentRound if hasattr(workflow, 'currentRound') else 'N/A'}, Task {workflow.currentTask if hasattr(workflow, 'currentTask') else 'N/A'}, Action {workflow.currentAction if hasattr(workflow, 'currentAction') else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Error setting workflow context: {str(e)}")
|
|
|
|
def getWorkflowStats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive workflow statistics including current context"""
|
|
try:
|
|
workflow_context = self.getWorkflowContext()
|
|
return {
|
|
'currentRound': workflow_context['currentRound'],
|
|
'currentTask': workflow_context['currentTask'],
|
|
'currentAction': workflow_context['currentAction'],
|
|
'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0,
|
|
'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0,
|
|
'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown',
|
|
'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown'
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow stats: {str(e)}")
|
|
return {
|
|
'currentRound': 0,
|
|
'currentTask': 0,
|
|
'currentAction': 0,
|
|
'totalTasks': 0,
|
|
'totalActions': 0,
|
|
'workflowStatus': 'unknown',
|
|
'workflowId': 'unknown'
|
|
}
|
|
|
|
def createWorkflow(self, workflowData: Dict[str, Any]):
|
|
"""Create a new workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.createWorkflow(workflowData)
|
|
except Exception as e:
|
|
logger.error(f"Error creating workflow: {str(e)}")
|
|
raise
|
|
|
|
def updateWorkflow(self, workflowId: str, updateData: Dict[str, Any]):
|
|
"""Update workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateWorkflow(workflowId, updateData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow: {str(e)}")
|
|
raise
|
|
|
|
def updateWorkflowStats(self, workflowId: str, **kwargs):
|
|
"""Update workflow statistics by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateWorkflowStats(workflowId, **kwargs)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow stats: {str(e)}")
|
|
raise
|
|
|
|
def getWorkflow(self, workflowId: str):
|
|
"""Get workflow by ID by delegating to the chat interface"""
|
|
try:
|
|
logger.debug(f"getWorkflow called with workflowId: {workflowId}")
|
|
result = self.interfaceDbChat.getWorkflow(workflowId)
|
|
if result:
|
|
logger.debug(f"getWorkflow returned workflow with ID: {result.id}")
|
|
else:
|
|
logger.warning(f"getWorkflow returned None for workflowId: {workflowId}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow: {str(e)}")
|
|
raise
|
|
|
|
def createMessage(self, messageData: Dict[str, Any]):
|
|
"""Create a new message by delegating to the chat interface and append to in-memory workflow."""
|
|
try:
|
|
message = self.interfaceDbChat.createMessage(messageData)
|
|
try:
|
|
# Keep in-memory workflow messages in sync
|
|
workflow = getattr(self.services, 'currentWorkflow', None)
|
|
if workflow and hasattr(workflow, 'messages') and message:
|
|
# Avoid duplicates if same message was already appended
|
|
if not any(getattr(m, 'id', None) == getattr(message, 'id', None) for m in workflow.messages):
|
|
workflow.messages.append(message)
|
|
except Exception:
|
|
# Never fail if local append has issues
|
|
pass
|
|
return message
|
|
except Exception as e:
|
|
logger.error(f"Error creating message: {str(e)}")
|
|
raise
|
|
|
|
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
|
|
"""Update message by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateMessage(messageId, messageData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating message: {str(e)}")
|
|
raise
|
|
|
|
def createLog(self, logData: Dict[str, Any]):
|
|
"""Create a new log entry by delegating to the chat interface and append to in-memory workflow logs."""
|
|
try:
|
|
log_entry = self.interfaceDbChat.createLog(logData)
|
|
try:
|
|
workflow = getattr(self.services, 'currentWorkflow', None)
|
|
if workflow and hasattr(workflow, 'logs') and log_entry:
|
|
# Avoid duplicates by id if present, else compare message+timestamp tuple
|
|
get_id = getattr(log_entry, 'id', None)
|
|
if get_id is not None:
|
|
if not any(getattr(l, 'id', None) == get_id for l in workflow.logs):
|
|
workflow.logs.append(log_entry)
|
|
else:
|
|
key = (getattr(log_entry, 'message', None), getattr(log_entry, 'publishedAt', None))
|
|
if not any((getattr(l, 'message', None), getattr(l, 'publishedAt', None)) == key for l in workflow.logs):
|
|
workflow.logs.append(log_entry)
|
|
except Exception:
|
|
pass
|
|
return log_entry
|
|
except Exception as e:
|
|
logger.error(f"Error creating log: {str(e)}")
|
|
raise
|
|
|
|
def getDocumentCount(self) -> str:
|
|
"""Get document count for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Count documents from all messages in the workflow (like old system)
|
|
total_docs = 0
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
total_docs += len(message.documents)
|
|
|
|
if total_docs == 0:
|
|
return "No documents available"
|
|
|
|
return f"{total_docs} document(s) available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting document count: {str(e)}")
|
|
return "No documents available"
|
|
|
|
def getWorkflowHistoryContext(self) -> str:
|
|
"""Get workflow history context for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Check if there are any previous rounds by looking for "first" messages
|
|
has_previous_rounds = False
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'status') and message.status == "first":
|
|
has_previous_rounds = True
|
|
break
|
|
|
|
if not has_previous_rounds:
|
|
return "No previous round context available"
|
|
|
|
# Get document reference list to show what documents are available from previous rounds
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Build context string showing previous rounds
|
|
context = "Previous workflow rounds contain documents:\n"
|
|
|
|
# Show history exchanges (previous rounds)
|
|
if document_list["history"]:
|
|
for exchange in document_list["history"]:
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} ({len(exchange['documents'])} documents)\n"
|
|
else:
|
|
context = "No previous round context available"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow history context: {str(e)}")
|
|
return "No previous round context available"
|
|
|
|
def getAvailableDocuments(self, workflow) -> str:
|
|
"""Get available documents formatted for AI prompts (exact copy of old ServiceCenter.getEnhancedDocumentContext)"""
|
|
try:
|
|
if not workflow or not hasattr(workflow, 'messages'):
|
|
return "No documents available"
|
|
|
|
# Use the provided workflow object directly to avoid database reload issues
|
|
# that can cause filename truncation. The workflow object should already be up-to-date.
|
|
logger.debug(f"Using provided workflow object for getAvailableDocuments (ID: {workflow.id if hasattr(workflow, 'id') else 'unknown'})")
|
|
|
|
# Debug: Check document filenames in the workflow object
|
|
if hasattr(workflow, 'messages') and workflow.messages:
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
for doc in message.documents:
|
|
logger.debug(f"Workflow document {doc.id}: fileName='{doc.fileName}' (length: {len(doc.fileName)})")
|
|
|
|
# Get document reference list using the exact same logic as old system
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Optional: dump a concise document index for debugging
|
|
try:
|
|
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
|
|
if debug_enabled:
|
|
import os, json
|
|
from datetime import datetime, UTC
|
|
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
|
|
debug_root = "./test-chat/ai"
|
|
os.makedirs(debug_root, exist_ok=True)
|
|
doc_index = []
|
|
for bucket in ("chat", "history"):
|
|
for ex in document_list.get(bucket, []) or []:
|
|
doc_index.append({
|
|
"bucket": bucket,
|
|
"label": ex.get("documentsLabel"),
|
|
"documents": ex.get("documents", [])
|
|
})
|
|
with open(os.path.join(debug_root, f"{ts}_available_documents_index.json"), "w", encoding="utf-8") as f:
|
|
json.dump({
|
|
"workflowId": getattr(workflow, 'id', None),
|
|
"index": doc_index
|
|
}, f, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
pass
|
|
|
|
# Build index string for AI action planning
|
|
context = ""
|
|
|
|
# Process current round exchanges first
|
|
if document_list["chat"]:
|
|
context += "\nCurrent round documents:\n"
|
|
for exchange in document_list["chat"]:
|
|
# Generate docList reference for the exchange (using message ID and label)
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
# Fallback to label-only format if message ID not found
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
# Process previous rounds after
|
|
if document_list["history"]:
|
|
context += "\nPast rounds documents:\n"
|
|
for exchange in document_list["history"]:
|
|
# Generate docList reference for the exchange (using message ID and label)
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
# Fallback to label-only format if message ID not found
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
if not document_list["chat"] and not document_list["history"]:
|
|
context += "\nNO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting available documents: {str(e)}")
|
|
return "NO DOCUMENTS AVAILABLE - Error generating document context."
|
|
|
|
def _getDocumentReferenceList(self, workflow) -> Dict[str, List]:
|
|
"""Get list of document exchanges with new labeling format, sorted by recency (exact copy of old system)"""
|
|
# Collect all documents first and refresh their attributes
|
|
all_documents = []
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
all_documents.extend(message.documents)
|
|
|
|
# Refresh file attributes for all documents
|
|
if all_documents:
|
|
self._refreshDocumentFileAttributes(all_documents)
|
|
|
|
def _is_valid_document(doc) -> bool:
|
|
try:
|
|
size_ok = getattr(doc, 'fileSize', 0) and getattr(doc, 'fileSize', 0) > 0
|
|
id_ok = bool(getattr(doc, 'fileId', None))
|
|
mime_ok = bool(getattr(doc, 'mimeType', None))
|
|
return size_ok and id_ok and mime_ok
|
|
except Exception:
|
|
return False
|
|
|
|
chat_exchanges = []
|
|
history_exchanges = []
|
|
|
|
in_current_round = True
|
|
for message in reversed(workflow.messages):
|
|
is_first = message.status == "first" if hasattr(message, 'status') else False
|
|
|
|
doc_exchange = None
|
|
if message.documents:
|
|
existing_label = getattr(message, 'documentsLabel', None)
|
|
if existing_label:
|
|
validated_label = self._validateDocumentLabelConsistency(message)
|
|
doc_refs = []
|
|
for doc in message.documents:
|
|
if not _is_valid_document(doc):
|
|
# Skip empty/invalid docs
|
|
continue
|
|
doc_ref = self._getDocumentReferenceFromChatDocument(doc, message)
|
|
doc_refs.append(doc_ref)
|
|
if doc_refs:
|
|
doc_exchange = {
|
|
'documentsLabel': validated_label,
|
|
'documents': doc_refs
|
|
}
|
|
|
|
if doc_exchange:
|
|
if in_current_round:
|
|
chat_exchanges.append(doc_exchange)
|
|
else:
|
|
history_exchanges.append(doc_exchange)
|
|
|
|
if in_current_round and is_first:
|
|
in_current_round = False
|
|
|
|
chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
|
|
history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x, workflow), reverse=True)
|
|
|
|
return {
|
|
"chat": chat_exchanges,
|
|
"history": history_exchanges
|
|
}
|
|
|
|
def _refreshDocumentFileAttributes(self, documents) -> None:
|
|
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
|
|
for doc in documents:
|
|
try:
|
|
original_filename = doc.fileName
|
|
logger.debug(f"Before refresh - Document {doc.id}: fileName='{original_filename}' (length: {len(original_filename)})")
|
|
|
|
# Skip invalid docs early if essential identifiers are missing
|
|
if not getattr(doc, 'fileId', None):
|
|
logger.debug(f"Skipping document {doc.id} due to missing fileId")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
continue
|
|
|
|
file_info = self.getFileInfo(doc.fileId)
|
|
if file_info:
|
|
db_filename = file_info.get("fileName", doc.fileName)
|
|
logger.debug(f"Database filename for {doc.id}: '{db_filename}' (length: {len(db_filename)})")
|
|
|
|
doc.fileName = file_info.get("fileName", doc.fileName)
|
|
doc.fileSize = file_info.get("size", doc.fileSize)
|
|
doc.mimeType = file_info.get("mimeType", doc.mimeType)
|
|
|
|
# Mark invalid if missing mimeType
|
|
if not doc.mimeType:
|
|
logger.debug(f"Document {doc.id} has missing mimeType; will be filtered from index")
|
|
setattr(doc, 'fileSize', 0)
|
|
|
|
logger.debug(f"After refresh - Document {doc.id}: fileName='{doc.fileName}' (length: {len(doc.fileName)})")
|
|
else:
|
|
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
|
|
|
|
def _generateWorkflowContextPrefix(self, message) -> str:
|
|
"""Generate workflow context prefix: round{num}_task{num}_action{num}"""
|
|
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
|
|
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
|
|
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
|
|
return f"round{round_num}_task{task_num}_action{action_num}"
|
|
|
|
def _getDocumentReferenceFromChatDocument(self, document, message) -> str:
|
|
"""Get document reference using document ID and filename."""
|
|
try:
|
|
# Debug logging to track filename truncation
|
|
logger.debug(f"Creating document reference for {document.id}: fileName='{document.fileName}' (length: {len(document.fileName)})")
|
|
# Use document ID and filename for simple reference
|
|
return f"docItem:{document.id}:{document.fileName}"
|
|
except Exception as e:
|
|
logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
|
|
# Re-raise the error to prevent workflow from continuing with invalid data
|
|
raise
|
|
|
|
def _getMessageSequenceForExchange(self, exchange, workflow) -> int:
|
|
"""Get message sequence number for sorting exchanges by recency"""
|
|
try:
|
|
# Extract message ID from the first document reference
|
|
if exchange['documents'] and len(exchange['documents']) > 0:
|
|
first_doc_ref = exchange['documents'][0]
|
|
if first_doc_ref.startswith("docItem:"):
|
|
# docItem:<id>:<label> - extract ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
doc_id = parts[1]
|
|
# Find the message containing this document
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == doc_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
elif first_doc_ref.startswith("docList:"):
|
|
# docList:<message_id>:<label> - extract message ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
message_id = parts[1]
|
|
# Find the message by ID
|
|
for message in workflow.messages:
|
|
if str(message.id) == message_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting message sequence for exchange: {str(e)}")
|
|
return 0
|
|
|
|
def _validateDocumentLabelConsistency(self, message) -> str:
|
|
"""Validate that the document label used for references matches the message's actual label"""
|
|
if not hasattr(message, 'documentsLabel') or not message.documentsLabel:
|
|
return None
|
|
|
|
# Simply return the message's actual documentsLabel - no correction, just validation
|
|
return message.documentsLabel
|
|
|
|
def getConnectionReferenceList(self) -> List[str]:
|
|
"""Get connection reference list (matching old handlingTasks.py logic)"""
|
|
try:
|
|
# Get connections from the database using the same logic as the old system
|
|
if hasattr(self.services, 'interfaceDbApp') and hasattr(self.services, 'user'):
|
|
userId = self.services.user.id
|
|
connections = self.services.interfaceDbApp.getUserConnections(userId)
|
|
if connections:
|
|
# Format connections as reference strings using the same pattern as the old system
|
|
connectionRefs = []
|
|
for conn in connections:
|
|
# Create reference string in format: connection:{authority}:{username} [status:..., token:...]
|
|
# This matches the format expected by getUserConnectionFromConnectionReference()
|
|
ref = self.getConnectionReferenceFromUserConnection(conn)
|
|
connectionRefs.append(ref)
|
|
return connectionRefs
|
|
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection reference list: {str(e)}")
|
|
return [] |