909 lines
No EOL
44 KiB
Python
909 lines
No EOL
44 KiB
Python
import logging
|
|
import uuid
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelUam import User, UserConnection
|
|
from modules.datamodels.datamodelChat import ChatDocument, ChatMessage, ChatStat, ChatLog
|
|
from modules.datamodels.datamodelChat import ChatContentExtracted
|
|
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
|
|
from modules.services.serviceGeneration.subDocumentUtility import getFileExtension, getMimeTypeFromExtension, detectContentTypeFromData
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
from modules.security.tokenManager import TokenManager
|
|
from modules.shared.progressLogger import ProgressLogger
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class WorkflowService:
|
|
"""Service class containing methods for document processing, chat operations, and workflow management"""
|
|
|
|
def __init__(self, serviceCenter):
|
|
self.services = serviceCenter
|
|
self.user = serviceCenter.user
|
|
self.workflow = serviceCenter.workflow
|
|
self.interfaceDbChat = serviceCenter.interfaceDbChat
|
|
self.interfaceDbComponent = serviceCenter.interfaceDbComponent
|
|
self.interfaceDbApp = serviceCenter.interfaceDbApp
|
|
|
|
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
|
|
"""
|
|
Summarize chat messages from last to first message with status="first"
|
|
|
|
Args:
|
|
messages: List of chat messages to summarize
|
|
|
|
Returns:
|
|
str: Summary of the chat in user's language
|
|
"""
|
|
try:
|
|
# Get messages from last to first, stopping at first message with status="first"
|
|
relevantMessages = []
|
|
for msg in reversed(messages):
|
|
relevantMessages.append(msg)
|
|
if msg.status == "first":
|
|
break
|
|
|
|
# Create prompt for AI
|
|
prompt = f"""You are an AI assistant providing a summary of a chat conversation.
|
|
Please respond in '{self.user.language}' language.
|
|
|
|
Chat History:
|
|
{chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))}
|
|
|
|
Instructions:
|
|
1. Summarize the conversation's key points and outcomes
|
|
2. Be concise but informative
|
|
3. Use a professional but friendly tone
|
|
4. Focus on important decisions and next steps if any
|
|
|
|
Please provide a comprehensive summary of this conversation."""
|
|
|
|
# Get summary using AI service directly (avoiding circular dependency)
|
|
ai_service = AiService(self)
|
|
return await ai_service.callAi(
|
|
prompt=prompt,
|
|
documents=None,
|
|
options={
|
|
"process_type": "text",
|
|
"operation_type": "generate_content",
|
|
"priority": "speed",
|
|
"compress_prompt": True,
|
|
"compress_documents": False,
|
|
"max_cost": 0.01
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error summarizing chat: {str(e)}")
|
|
return f"Error summarizing chat: {str(e)}"
|
|
|
|
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
|
|
"""Get ChatDocuments from a list of document references using all three formats."""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
|
|
logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}")
|
|
|
|
# Debug: list available messages with their labels and document names
|
|
try:
|
|
if workflow and hasattr(workflow, 'messages') and workflow.messages:
|
|
msg_lines = []
|
|
for message in workflow.messages:
|
|
label = getattr(message, 'documentsLabel', None)
|
|
doc_names = []
|
|
if getattr(message, 'documents', None):
|
|
for doc in message.documents:
|
|
name = getattr(doc, 'fileName', None) or getattr(doc, 'documentName', None) or 'Unnamed'
|
|
doc_names.append(name)
|
|
msg_lines.append(
|
|
f"- id={getattr(message, 'id', None)}, label={label}, docs={doc_names}"
|
|
)
|
|
if msg_lines:
|
|
logger.debug("getChatDocumentsFromDocumentList: available messages:\n" + "\n".join(msg_lines))
|
|
else:
|
|
logger.debug("getChatDocumentsFromDocumentList: no messages available on current workflow")
|
|
except Exception as e:
|
|
logger.debug(f"getChatDocumentsFromDocumentList: unable to enumerate messages for debug: {e}")
|
|
|
|
all_documents = []
|
|
for doc_ref in documentList:
|
|
if doc_ref.startswith("docItem:"):
|
|
# docItem:<id>:<filename> - extract ID and find document
|
|
parts = doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
doc_id = parts[1]
|
|
# Find the document by ID
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == doc_id:
|
|
doc_name = getattr(doc, 'fileName', 'unknown')
|
|
all_documents.append(doc)
|
|
break
|
|
elif doc_ref.startswith("docList:"):
|
|
# docList:<messageId>:<label> or docList:<label> - extract message ID and find document list
|
|
parts = doc_ref.split(':')
|
|
if len(parts) >= 3:
|
|
# Format: docList:<messageId>:<label>
|
|
message_id = parts[1]
|
|
label = parts[2]
|
|
# Find the message by ID and get all its documents
|
|
message_found = False
|
|
for message in workflow.messages:
|
|
if str(message.id) == message_id:
|
|
message_found = True
|
|
if message.documents:
|
|
doc_names = [doc.fileName for doc in message.documents if hasattr(doc, 'fileName')]
|
|
all_documents.extend(message.documents)
|
|
else:
|
|
pass
|
|
break
|
|
|
|
if not message_found:
|
|
available_ids = [str(msg.id) for msg in workflow.messages]
|
|
logger.error(f"Message with ID {message_id} not found in workflow. Available message IDs: {available_ids}")
|
|
raise ValueError(f"Document reference not found: docList:{message_id}:{label}")
|
|
elif len(parts) >= 2:
|
|
# Format: docList:<label> - find message by documentsLabel
|
|
label = parts[1]
|
|
# Find messages with matching documentsLabel
|
|
matching_messages = []
|
|
for message in workflow.messages:
|
|
# Check both attribute and raw data for documentsLabel
|
|
msg_label = getattr(message, 'documentsLabel', None)
|
|
if msg_label == label:
|
|
matching_messages.append(message)
|
|
else:
|
|
pass
|
|
|
|
if matching_messages:
|
|
# Use the newest message (highest publishedAt)
|
|
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
|
|
newest_message = matching_messages[0]
|
|
|
|
if newest_message.documents:
|
|
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
|
|
all_documents.extend(newest_message.documents)
|
|
else:
|
|
pass
|
|
else:
|
|
logger.error(f"No messages found with documentsLabel: {label}")
|
|
raise ValueError(f"Document reference not found: docList:{label}")
|
|
else:
|
|
# Direct label reference (round1_task2_action3_contextinfo)
|
|
# Search for messages with matching documentsLabel to find the actual documents
|
|
if doc_ref.startswith("round"):
|
|
# Parse round/task/action to find the corresponding document list
|
|
label_parts = doc_ref.split('_', 3)
|
|
if len(label_parts) >= 4:
|
|
round_num = int(label_parts[0].replace('round', ''))
|
|
task_num = int(label_parts[1].replace('task', ''))
|
|
action_num = int(label_parts[2].replace('action', ''))
|
|
context_info = label_parts[3]
|
|
|
|
# Find messages with matching documentsLabel (this is the correct way!)
|
|
# In case of retries, we want the NEWEST message (most recent publishedAt)
|
|
matching_messages = []
|
|
for message in workflow.messages:
|
|
msg_documents_label = getattr(message, 'documentsLabel', '')
|
|
|
|
# Check if this message's documentsLabel matches our reference
|
|
if msg_documents_label == doc_ref:
|
|
# Found a matching message, collect it for comparison
|
|
matching_messages.append(message)
|
|
|
|
# If we found matching messages, take the newest one (highest publishedAt)
|
|
if matching_messages:
|
|
# Sort by publishedAt descending (newest first)
|
|
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
|
|
newest_message = matching_messages[0]
|
|
|
|
if newest_message.documents:
|
|
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
|
|
logger.debug(f"Added {len(newest_message.documents)} documents from newest message {newest_message.id}: {doc_names}")
|
|
all_documents.extend(newest_message.documents)
|
|
else:
|
|
logger.debug(f"No documents found in newest message {newest_message.id}")
|
|
else:
|
|
logger.error(f"No messages found with documentsLabel: {doc_ref}")
|
|
raise ValueError(f"Document reference not found: {doc_ref}")
|
|
|
|
logger.debug(f"Resolved {len(all_documents)} documents from document list: {documentList}")
|
|
return all_documents
|
|
except Exception as e:
|
|
logger.error(f"Error getting documents from document list: {str(e)}")
|
|
return []
|
|
|
|
def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str:
|
|
"""Get connection reference from UserConnection with enhanced state information"""
|
|
# Get token information to check if it's expired
|
|
token = None
|
|
token_status = "unknown"
|
|
try:
|
|
# Get a fresh token via TokenManager convenience method
|
|
logger.debug(f"Getting fresh token for connection {connection.id}")
|
|
token = TokenManager().getFreshToken(connection.id)
|
|
if token:
|
|
if hasattr(token, 'expiresAt') and token.expiresAt:
|
|
current_time = get_utc_timestamp()
|
|
if current_time > token.expiresAt:
|
|
token_status = "expired"
|
|
else:
|
|
# Check if this token was recently refreshed (within last 5 minutes)
|
|
time_since_creation = current_time - token.createdAt if hasattr(token, 'createdAt') else 0
|
|
if time_since_creation < 300: # 5 minutes
|
|
token_status = "valid (refreshed)"
|
|
else:
|
|
token_status = "valid"
|
|
else:
|
|
token_status = "no_expiration"
|
|
else:
|
|
token_status = "no_token"
|
|
except Exception as e:
|
|
token_status = f"error: {str(e)}"
|
|
|
|
# Build enhanced reference with state information
|
|
# Format: connection:msft:<username> (without UUID)
|
|
base_ref = f"connection:{connection.authority.value}:{connection.externalUsername}"
|
|
state_info = f" [status:{connection.status.value}, token:{token_status}]"
|
|
|
|
logger.debug(f"getConnectionReferenceFromUserConnection: Built reference: {base_ref + state_info}")
|
|
return base_ref + state_info
|
|
|
|
def getUserConnectionByExternalUsername(self, authority: str, externalUsername: str) -> Optional[UserConnection]:
|
|
"""Fetch the user's connection by authority and external username."""
|
|
try:
|
|
if not authority or not externalUsername:
|
|
return None
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
for connection in user_connections:
|
|
# Normalize authority for comparison (enum vs string)
|
|
connection_authority = connection.authority.value if hasattr(connection.authority, 'value') else str(connection.authority)
|
|
if connection_authority.lower() == authority.lower() and connection.externalUsername == externalUsername:
|
|
return connection
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection by external username: {str(e)}")
|
|
return None
|
|
|
|
def getUserConnectionFromConnectionReference(self, connectionReference: str) -> Optional[UserConnection]:
|
|
"""Get UserConnection from reference string (handles new format without UUID)"""
|
|
try:
|
|
# Parse reference format: connection:{authority}:{username} [status:..., token:...]
|
|
# Remove state information if present
|
|
base_reference = connectionReference.split(' [')[0]
|
|
|
|
parts = base_reference.split(':')
|
|
if len(parts) != 3 or parts[0] != "connection":
|
|
return None
|
|
|
|
authority = parts[1]
|
|
username = parts[2]
|
|
|
|
# Get user connections through AppObjects interface
|
|
user_connections = self.interfaceDbApp.getUserConnections(self.user.id)
|
|
|
|
# Find matching connection by authority and username (no UUID needed)
|
|
for conn in user_connections:
|
|
if conn.authority.value == authority and conn.externalUsername == username:
|
|
return conn
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing connection reference: {str(e)}")
|
|
return None
|
|
|
|
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
|
|
"""Get file information"""
|
|
file_item = self.interfaceDbComponent.getFile(fileId)
|
|
if file_item:
|
|
return {
|
|
"id": file_item.id,
|
|
"fileName": file_item.fileName,
|
|
"size": file_item.fileSize,
|
|
"mimeType": file_item.mimeType,
|
|
"fileHash": file_item.fileHash,
|
|
"creationDate": file_item.creationDate
|
|
}
|
|
return None
|
|
|
|
def getFileData(self, fileId: str) -> bytes:
|
|
"""Get file data by ID"""
|
|
return self.interfaceDbComponent.getFileData(fileId)
|
|
|
|
def _diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]:
|
|
"""
|
|
Diagnose document access issues and provide recovery information.
|
|
This method helps identify why document properties are inaccessible.
|
|
"""
|
|
try:
|
|
diagnosis = {
|
|
'document_id': document.id,
|
|
'file_id': document.fileId,
|
|
'has_component_interface': document._componentInterface is not None,
|
|
'component_interface_type': type(document._componentInterface).__name__ if document._componentInterface else None,
|
|
'file_exists': False,
|
|
'file_info': None,
|
|
'error_details': None
|
|
}
|
|
|
|
# Check if component interface is set
|
|
if not document._componentInterface:
|
|
diagnosis['error_details'] = "Component interface not set - document cannot access file system"
|
|
return diagnosis
|
|
|
|
# Try to access the file directly
|
|
try:
|
|
file_info = self.interfaceDbComponent.getFile(document.fileId)
|
|
if file_info:
|
|
diagnosis['file_exists'] = True
|
|
diagnosis['file_info'] = {
|
|
'fileName': file_info.fileName if hasattr(file_info, 'fileName') else 'N/A',
|
|
'fileSize': file_info.fileSize if hasattr(file_info, 'fileSize') else 'N/A',
|
|
'mimeType': file_info.mimeType if hasattr(file_info, 'mimeType') else 'N/A'
|
|
}
|
|
else:
|
|
diagnosis['error_details'] = f"File with ID {document.fileId} not found in component interface"
|
|
except Exception as e:
|
|
diagnosis['error_details'] = f"Error accessing file {document.fileId}: {str(e)}"
|
|
|
|
return diagnosis
|
|
|
|
except Exception as e:
|
|
return {
|
|
'document_id': document.id if hasattr(document, 'id') else 'unknown',
|
|
'file_id': document.fileId if hasattr(document, 'fileId') else 'unknown',
|
|
'error_details': f"Error during diagnosis: {str(e)}"
|
|
}
|
|
|
|
def _recoverDocumentAccess(self, document: ChatDocument) -> bool:
|
|
"""
|
|
Attempt to recover document access by re-setting the component interface.
|
|
Returns True if recovery was successful.
|
|
"""
|
|
try:
|
|
logger.info(f"Attempting to recover document access for document {document.id}")
|
|
|
|
# Re-set the component interface
|
|
document.setComponentInterface(self.interfaceDbComponent)
|
|
|
|
# Test if we can now access the fileName
|
|
try:
|
|
test_fileName = document.fileName
|
|
logger.info(f"Document access recovered for {document.id} -> {test_fileName}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Document access recovery failed for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during document access recovery for {document.id}: {str(e)}")
|
|
return False
|
|
|
|
def calculateObjectSize(self, obj: Any) -> int:
|
|
"""
|
|
Calculate the size of an object in bytes.
|
|
|
|
Args:
|
|
obj: Object to calculate size for
|
|
|
|
Returns:
|
|
int: Size in bytes
|
|
"""
|
|
try:
|
|
import json
|
|
import sys
|
|
|
|
if obj is None:
|
|
return 0
|
|
|
|
# Convert object to JSON string and calculate size
|
|
json_str = json.dumps(obj, ensure_ascii=False, default=str)
|
|
return len(json_str.encode('utf-8'))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating object size: {str(e)}")
|
|
return 0
|
|
|
|
def getWorkflowContext(self) -> Dict[str, int]:
|
|
"""Get current workflow context for document generation"""
|
|
try:
|
|
return {
|
|
'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0,
|
|
'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0,
|
|
'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow context: {str(e)}")
|
|
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
|
|
|
|
def setWorkflowContext(self, round_number: int = None, task_number: int = None, action_number: int = None):
|
|
"""Set current workflow context for document generation and routing"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Prepare update data
|
|
update_data = {}
|
|
|
|
if round_number is not None:
|
|
workflow.currentRound = round_number
|
|
update_data["currentRound"] = round_number
|
|
if task_number is not None:
|
|
workflow.currentTask = task_number
|
|
update_data["currentTask"] = task_number
|
|
if action_number is not None:
|
|
workflow.currentAction = action_number
|
|
update_data["currentAction"] = action_number
|
|
|
|
# Persist changes to database if any updates were made
|
|
if update_data:
|
|
self.interfaceDbChat.updateWorkflow(workflow.id, update_data)
|
|
|
|
logger.debug(f"Updated workflow context: Round {workflow.currentRound if hasattr(workflow, 'currentRound') else 'N/A'}, Task {workflow.currentTask if hasattr(workflow, 'currentTask') else 'N/A'}, Action {workflow.currentAction if hasattr(workflow, 'currentAction') else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Error setting workflow context: {str(e)}")
|
|
|
|
def getWorkflowStats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive workflow statistics including current context"""
|
|
try:
|
|
workflow_context = self.getWorkflowContext()
|
|
return {
|
|
'currentRound': workflow_context['currentRound'],
|
|
'currentTask': workflow_context['currentTask'],
|
|
'currentAction': workflow_context['currentAction'],
|
|
'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0,
|
|
'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0,
|
|
'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown',
|
|
'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown'
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow stats: {str(e)}")
|
|
return {
|
|
'currentRound': 0,
|
|
'currentTask': 0,
|
|
'currentAction': 0,
|
|
'totalTasks': 0,
|
|
'totalActions': 0,
|
|
'workflowStatus': 'unknown',
|
|
'workflowId': 'unknown'
|
|
}
|
|
|
|
def createWorkflow(self, workflowData: Dict[str, Any]):
|
|
"""Create a new workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.createWorkflow(workflowData)
|
|
except Exception as e:
|
|
logger.error(f"Error creating workflow: {str(e)}")
|
|
raise
|
|
|
|
def updateWorkflow(self, workflowId: str, updateData: Dict[str, Any]):
|
|
"""Update workflow by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateWorkflow(workflowId, updateData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow: {str(e)}")
|
|
raise
|
|
|
|
def getWorkflow(self, workflowId: str):
|
|
"""Get workflow by ID by delegating to the chat interface"""
|
|
try:
|
|
logger.debug(f"getWorkflow called with workflowId: {workflowId}")
|
|
result = self.interfaceDbChat.getWorkflow(workflowId)
|
|
if result:
|
|
logger.debug(f"getWorkflow returned workflow with ID: {result.id}")
|
|
else:
|
|
logger.warning(f"getWorkflow returned None for workflowId: {workflowId}")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow: {str(e)}")
|
|
raise
|
|
|
|
# === Service-level transactions (DB write-through + in-memory sync) ===
|
|
|
|
def storeMessageWithDocuments(self, workflow: Any, messageData: Dict[str, Any], documents: List[Any]) -> ChatMessage:
|
|
"""Persist message and documents, then bind them into in-memory workflow (replace-by-id)."""
|
|
# Ensure workflowId on message
|
|
messageData = dict(messageData or {})
|
|
messageData["workflowId"] = workflow.id
|
|
# Attach documents to message creation via interface (it persists message then docs)
|
|
messageDataWithDocs = dict(messageData)
|
|
messageDataWithDocs["documents"] = documents or []
|
|
chatInterface = self.interfaceDbChat
|
|
chatMessage = chatInterface.createMessage(messageDataWithDocs)
|
|
if not chatMessage:
|
|
raise ValueError("Failed to create message with documents")
|
|
# In-memory sync: replace or append
|
|
# replace-by-id if exists
|
|
replaced = False
|
|
for i, m in enumerate(workflow.messages or []):
|
|
if getattr(m, 'id', None) == getattr(chatMessage, 'id', None):
|
|
workflow.messages[i] = chatMessage
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
workflow.messages.append(chatMessage)
|
|
return chatMessage
|
|
|
|
def storeLog(self, workflow: Any, logData: Dict[str, Any]) -> ChatLog:
|
|
"""Persist ChatLog and map it into the in-memory workflow logs list."""
|
|
logData = dict(logData or {})
|
|
logData["workflowId"] = workflow.id
|
|
chatInterface = self.interfaceDbChat
|
|
chatLog = chatInterface.createLog(logData)
|
|
if not chatLog:
|
|
raise ValueError("Failed to create log")
|
|
# replace-by-id if exists
|
|
replaced = False
|
|
for i, lg in enumerate(workflow.logs):
|
|
if getattr(lg, 'id', None) == getattr(chatLog, 'id', None):
|
|
workflow.logs[i] = chatLog
|
|
replaced = True
|
|
break
|
|
if not replaced:
|
|
workflow.logs.append(chatLog)
|
|
return chatLog
|
|
|
|
def storeWorkflowStat(self, workflow: Any, aiResponse: Any, process: str) -> ChatStat:
|
|
"""Persist workflow-level ChatStat from AiCallResponse and append to workflow stats list."""
|
|
try:
|
|
# Create ChatStat from AiCallResponse data
|
|
statData = {
|
|
"workflowId": workflow.id,
|
|
"process": process,
|
|
"engine": aiResponse.modelName,
|
|
"priceUsd": aiResponse.priceUsd,
|
|
"processingTime": aiResponse.processingTime,
|
|
"bytesSent": aiResponse.bytesSent,
|
|
"bytesReceived": aiResponse.bytesReceived,
|
|
"errorCount": aiResponse.errorCount
|
|
}
|
|
|
|
# Create the stat record in the database
|
|
stat = self.interfaceDbChat.createStat(statData)
|
|
|
|
# Append to workflow stats list in memory
|
|
if not hasattr(workflow, 'stats') or workflow.stats is None:
|
|
workflow.stats = []
|
|
workflow.stats.append(stat)
|
|
|
|
return stat
|
|
except Exception as e:
|
|
logger.error(f"Failed to store workflow stat: {e}")
|
|
raise
|
|
|
|
|
|
def updateMessage(self, messageId: str, messageData: Dict[str, Any]):
|
|
"""Update message by delegating to the chat interface"""
|
|
try:
|
|
return self.interfaceDbChat.updateMessage(messageId, messageData)
|
|
except Exception as e:
|
|
logger.error(f"Error updating message: {str(e)}")
|
|
raise
|
|
|
|
def getDocumentCount(self) -> str:
|
|
"""Get document count for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Count documents from all messages in the workflow (like old system)
|
|
total_docs = 0
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
total_docs += len(message.documents)
|
|
|
|
if total_docs == 0:
|
|
return "No documents available"
|
|
|
|
return f"{total_docs} document(s) available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting document count: {str(e)}")
|
|
return "No documents available"
|
|
|
|
def getWorkflowHistoryContext(self) -> str:
|
|
"""Get workflow history context for task planning (matching old handlingTasks.py logic)"""
|
|
try:
|
|
workflow = self.services.currentWorkflow
|
|
|
|
# Check if there are any previous rounds by looking for "first" messages
|
|
has_previous_rounds = False
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'status') and message.status == "first":
|
|
has_previous_rounds = True
|
|
break
|
|
|
|
if not has_previous_rounds:
|
|
return "No previous round context available"
|
|
|
|
# Get document reference list to show what documents are available from previous rounds
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Build context string showing previous rounds
|
|
context = "Previous workflow rounds contain documents:\n"
|
|
|
|
# Show history exchanges (previous rounds)
|
|
if document_list["history"]:
|
|
for exchange in document_list["history"]:
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} ({len(exchange['documents'])} documents)\n"
|
|
else:
|
|
context = "No previous round context available"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow history context: {str(e)}")
|
|
return "No previous round context available"
|
|
|
|
def getAvailableDocuments(self, workflow) -> str:
|
|
"""Get available documents formatted for AI prompts (exact copy of old ServiceCenter.getEnhancedDocumentContext)"""
|
|
try:
|
|
if not workflow or not hasattr(workflow, 'messages'):
|
|
return "No documents available"
|
|
|
|
# Use the provided workflow object directly to avoid database reload issues
|
|
# that can cause filename truncation. The workflow object should already be up-to-date.
|
|
|
|
# Get document reference list using the exact same logic as old system
|
|
document_list = self._getDocumentReferenceList(workflow)
|
|
|
|
# Timestamp-only available documents index dump removed
|
|
|
|
# Build index string for AI action planning
|
|
context = ""
|
|
|
|
# Process current round exchanges first
|
|
if document_list["chat"]:
|
|
context += "\nCurrent round documents:\n"
|
|
for exchange in document_list["chat"]:
|
|
# Generate docList reference for the exchange (using message ID and label)
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
# Fallback to label-only format if message ID not found
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
# Process previous rounds after
|
|
if document_list["history"]:
|
|
context += "\nPast rounds documents:\n"
|
|
for exchange in document_list["history"]:
|
|
# Generate docList reference for the exchange (using message ID and label)
|
|
# Find the message that corresponds to this exchange
|
|
message_id = None
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
|
|
message_id = message.id
|
|
break
|
|
|
|
if message_id:
|
|
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
|
|
else:
|
|
# Fallback to label-only format if message ID not found
|
|
doc_list_ref = f"docList:{exchange['documentsLabel']}"
|
|
|
|
context += f"- {doc_list_ref} contains:\n"
|
|
# Generate docItem references for each document in the list
|
|
for doc_ref in exchange['documents']:
|
|
if doc_ref.startswith("docItem:"):
|
|
context += f" - {doc_ref}\n"
|
|
else:
|
|
# Convert to proper docItem format if needed
|
|
context += f" - docItem:{doc_ref}\n"
|
|
context += "\n"
|
|
|
|
if not document_list["chat"] and not document_list["history"]:
|
|
context += "\nNO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting available documents: {str(e)}")
|
|
return "NO DOCUMENTS AVAILABLE - Error generating document context."
|
|
|
|
def _getDocumentReferenceList(self, workflow) -> Dict[str, List]:
|
|
"""Get list of document exchanges with new labeling format, sorted by recency (exact copy of old system)"""
|
|
# Collect all documents first and refresh their attributes
|
|
all_documents = []
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
all_documents.extend(message.documents)
|
|
|
|
# Refresh file attributes for all documents
|
|
if all_documents:
|
|
self._refreshDocumentFileAttributes(all_documents)
|
|
|
|
def _is_valid_document(doc) -> bool:
|
|
try:
|
|
size_ok = getattr(doc, 'fileSize', 0) and getattr(doc, 'fileSize', 0) > 0
|
|
id_ok = bool(getattr(doc, 'fileId', None))
|
|
mime_ok = bool(getattr(doc, 'mimeType', None))
|
|
return size_ok and id_ok and mime_ok
|
|
except Exception:
|
|
return False
|
|
|
|
# Simplified, deterministic logic:
|
|
# - Walk messages newest-first
|
|
# - For each document, assign it exactly once to a bucket based on the message round
|
|
# - Never allow the same doc to appear in both buckets
|
|
chat_exchanges = []
|
|
history_exchanges = []
|
|
seen_doc_ids = set()
|
|
current_round = getattr(workflow, 'currentRound', None)
|
|
|
|
for message in reversed(workflow.messages):
|
|
if not getattr(message, 'documents', None):
|
|
continue
|
|
|
|
label = getattr(message, 'documentsLabel', None)
|
|
if not label:
|
|
# Skip messages without a label to keep references consistent
|
|
continue
|
|
|
|
doc_refs = []
|
|
for doc in message.documents:
|
|
if not _is_valid_document(doc):
|
|
continue
|
|
# Avoid duplicates across chat/history
|
|
doc_id = getattr(doc, 'id', None)
|
|
if not doc_id or doc_id in seen_doc_ids:
|
|
continue
|
|
seen_doc_ids.add(doc_id)
|
|
doc_ref = self.getDocumentReferenceFromChatDocument(doc)
|
|
doc_refs.append(doc_ref)
|
|
|
|
if not doc_refs:
|
|
continue
|
|
|
|
entry = {
|
|
'documentsLabel': label,
|
|
'documents': doc_refs
|
|
}
|
|
|
|
msg_round = getattr(message, 'roundNumber', None)
|
|
if current_round is not None and msg_round == current_round:
|
|
chat_exchanges.append(entry)
|
|
else:
|
|
history_exchanges.append(entry)
|
|
|
|
return {
|
|
"chat": chat_exchanges,
|
|
"history": history_exchanges
|
|
}
|
|
|
|
def _refreshDocumentFileAttributes(self, documents) -> None:
|
|
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
|
|
for doc in documents:
|
|
try:
|
|
original_filename = doc.fileName
|
|
# Skip invalid docs early if essential identifiers are missing
|
|
if not getattr(doc, 'fileId', None):
|
|
logger.debug(f"Skipping document {doc.id} due to missing fileId")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
continue
|
|
|
|
file_info = self.getFileInfo(doc.fileId)
|
|
if file_info:
|
|
db_filename = file_info.get("fileName", doc.fileName)
|
|
doc.fileName = file_info.get("fileName", doc.fileName)
|
|
doc.fileSize = file_info.get("size", doc.fileSize)
|
|
doc.mimeType = file_info.get("mimeType", doc.mimeType)
|
|
|
|
# Mark invalid if missing mimeType
|
|
if not doc.mimeType:
|
|
logger.debug(f"Document {doc.id} has missing mimeType; will be filtered from index")
|
|
setattr(doc, 'fileSize', 0)
|
|
|
|
else:
|
|
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
|
|
setattr(doc, 'fileSize', 0)
|
|
setattr(doc, 'mimeType', None)
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
|
|
|
|
def _generateWorkflowContextPrefix(self, message) -> str:
|
|
"""Generate workflow context prefix: round{num}_task{num}_action{num}"""
|
|
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
|
|
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
|
|
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
|
|
return f"round{round_num}_task{task_num}_action{action_num}"
|
|
|
|
def getDocumentReferenceFromChatDocument(self, document) -> str:
|
|
"""Get document reference using document ID and filename."""
|
|
try:
|
|
# Use document ID and filename for simple reference
|
|
return f"docItem:{document.id}:{document.fileName}"
|
|
except Exception as e:
|
|
logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
|
|
# Re-raise the error to prevent workflow from continuing with invalid data
|
|
raise
|
|
|
|
def _getMessageSequenceForExchange(self, exchange, workflow) -> int:
|
|
"""Get message sequence number for sorting exchanges by recency"""
|
|
try:
|
|
# Extract message ID from the first document reference
|
|
if exchange['documents'] and len(exchange['documents']) > 0:
|
|
first_doc_ref = exchange['documents'][0]
|
|
if first_doc_ref.startswith("docItem:"):
|
|
# docItem:<id>:<label> - extract ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
doc_id = parts[1]
|
|
# Find the message containing this document
|
|
for message in workflow.messages:
|
|
if message.documents:
|
|
for doc in message.documents:
|
|
if doc.id == doc_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
elif first_doc_ref.startswith("docList:"):
|
|
# docList:<message_id>:<label> - extract message ID
|
|
parts = first_doc_ref.split(':')
|
|
if len(parts) >= 2:
|
|
message_id = parts[1]
|
|
# Find the message by ID
|
|
for message in workflow.messages:
|
|
if str(message.id) == message_id:
|
|
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
|
|
return 0
|
|
except Exception as e:
|
|
logger.error(f"Error getting message sequence for exchange: {str(e)}")
|
|
return 0
|
|
|
|
def _validateDocumentLabelConsistency(self, message) -> str:
|
|
"""Validate that the document label used for references matches the message's actual label"""
|
|
if not hasattr(message, 'documentsLabel') or not message.documentsLabel:
|
|
return None
|
|
|
|
# Simply return the message's actual documentsLabel - no correction, just validation
|
|
return message.documentsLabel
|
|
|
|
def getConnectionReferenceList(self) -> List[str]:
|
|
"""Get connection reference list (matching old handlingTasks.py logic)"""
|
|
try:
|
|
# Get connections from the database using the same logic as the old system
|
|
if hasattr(self.services, 'interfaceDbApp') and hasattr(self.services, 'user'):
|
|
userId = self.services.user.id
|
|
connections = self.services.interfaceDbApp.getUserConnections(userId)
|
|
if connections:
|
|
# Format connections as reference strings using the same pattern as the old system
|
|
connectionRefs = []
|
|
for conn in connections:
|
|
# Create reference string in format: connection:{authority}:{username} [status:..., token:...]
|
|
# This matches the format expected by getUserConnectionFromConnectionReference()
|
|
ref = self.getConnectionReferenceFromUserConnection(conn)
|
|
connectionRefs.append(ref)
|
|
return connectionRefs
|
|
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting connection reference list: {str(e)}")
|
|
return []
|
|
|
|
def createProgressLogger(self, workflow) -> ProgressLogger:
|
|
return ProgressLogger(self, workflow) |