gateway/modules/chat/serviceCenter.py
2025-09-01 21:35:04 +02:00

1313 lines
64 KiB
Python

import logging
import importlib
import pkgutil
import inspect
import os
from typing import Dict, Any, List, Optional
from modules.interfaces.interfaceAppModel import User, UserConnection
from modules.interfaces.interfaceChatModel import (
TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent
)
from modules.interfaces.interfaceAiCalls import AiCalls
from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects
from modules.interfaces.interfaceChatModel import ActionResult
from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects
from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects
from modules.chat.documents.documentExtraction import DocumentExtraction
from modules.chat.methodBase import MethodBase
from modules.shared.timezoneUtils import get_utc_timestamp
import uuid
import asyncio
logger = logging.getLogger(__name__)
class ServiceCenter:
"""Service center that provides access to all services and their functions"""
def __init__(self, currentUser: User, workflow: ChatWorkflow):
# Core services
self.user = currentUser
self.workflow = workflow
self.tasks = workflow.tasks
self.statusEnums = TaskStatus
self.currentTask = None # Initialize current task as None
# Initialize managers
self.interfaceChat = getChatObjects(currentUser)
self.interfaceComponent = getComponentObjects(currentUser)
self.interfaceApp = getAppObjects(currentUser)
self.interfaceAiCalls = AiCalls()
self.documentProcessor = DocumentExtraction(self)
# Initialize methods catalog
self.methods = {}
# Discover additional methods
self._discoverMethods()
def _discoverMethods(self):
"""Dynamically discover all method classes and their actions in modules.methods package"""
try:
# Import the methods package
methodsPackage = importlib.import_module('modules.methods')
# Discover all modules in the package
for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
if not isPkg and name.startswith('method'):
try:
# Import the module
module = importlib.import_module(f'modules.methods.{name}')
# Find all classes in the module that inherit from MethodBase
for itemName, item in inspect.getmembers(module):
if (inspect.isclass(item) and
issubclass(item, MethodBase) and
item != MethodBase):
# Instantiate the method
methodInstance = item(self)
# Discover actions from public methods
actions = {}
for methodName, method in inspect.getmembers(type(methodInstance), predicate=inspect.iscoroutinefunction):
if not methodName.startswith('_'):
# Bind the method to the instance
bound_method = method.__get__(methodInstance, type(methodInstance))
sig = inspect.signature(method)
params = {}
for paramName, param in sig.parameters.items():
if paramName not in ['self']:
# Get parameter type
paramType = param.annotation if param.annotation != param.empty else Any
# Get parameter description from docstring or default
paramDesc = None
if param.default != param.empty and hasattr(param.default, '__doc__'):
paramDesc = param.default.__doc__
params[paramName] = {
'type': paramType,
'required': param.default == param.empty,
'description': paramDesc,
'default': param.default if param.default != param.empty else None
}
actions[methodName] = {
'description': method.__doc__ or '',
'parameters': params,
'method': bound_method
}
# Add method instance with discovered actions
self.methods[methodInstance.name] = {
'instance': methodInstance,
'description': methodInstance.description,
'actions': actions
}
logger.info(f"Discovered method: {methodInstance.name} with {len(actions)} actions")
except Exception as e:
logger.error(f"Error loading method module {name}: {str(e)}", exc_info=True)
except Exception as e:
logger.error(f"Error discovering methods: {str(e)}")
def detectContentTypeFromData(self, fileData: bytes, fileName: str) -> str:
"""
Detect content type from file data and fileName.
This method makes the MIME type detection function accessible through the service center.
Args:
fileData: Raw file data as bytes
fileName: Name of the file
Returns:
str: Detected MIME type
"""
try:
# Check file extension first
ext = os.path.splitext(fileName)[1].lower()
if ext:
# Map common extensions to MIME types
extToMime = {
'.txt': 'text/plain',
'.md': 'text/markdown',
'.csv': 'text/csv',
'.json': 'application/json',
'.xml': 'application/xml',
'.js': 'application/javascript',
'.py': 'application/x-python',
'.svg': 'image/svg+xml',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.xls': 'application/vnd.ms-excel',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.ppt': 'application/vnd.ms-powerpoint',
'.html': 'text/html',
'.htm': 'text/html',
'.css': 'text/css',
'.zip': 'application/zip',
'.rar': 'application/x-rar-compressed',
'.7z': 'application/x-7z-compressed',
'.tar': 'application/x-tar',
'.gz': 'application/gzip'
}
if ext in extToMime:
return extToMime[ext]
# Try to detect from content
if fileData.startswith(b'%PDF'):
return 'application/pdf'
elif fileData.startswith(b'PK\x03\x04'):
# ZIP-based formats (docx, xlsx, pptx)
return 'application/zip'
elif fileData.startswith(b'<'):
# XML-based formats
try:
text = fileData.decode('utf-8', errors='ignore')
if '<svg' in text.lower():
return 'image/svg+xml'
elif '<html' in text.lower():
return 'text/html'
else:
return 'application/xml'
except:
pass
elif fileData.startswith(b'\x89PNG\r\n\x1a\n'):
return 'image/png'
elif fileData.startswith(b'\xff\xd8\xff'):
return 'image/jpeg'
elif fileData.startswith(b'GIF87a') or fileData.startswith(b'GIF89a'):
return 'image/gif'
elif fileData.startswith(b'BM'):
return 'image/bmp'
elif fileData.startswith(b'RIFF') and fileData[8:12] == b'WEBP':
return 'image/webp'
return 'application/octet-stream'
except Exception as e:
logger.error(f"Error detecting content type from data: {str(e)}")
return 'application/octet-stream'
def getMimeTypeFromExtension(self, extension: str) -> str:
"""
Get MIME type based on file extension.
This method consolidates MIME type detection from extension.
Args:
extension: File extension (with or without dot)
Returns:
str: MIME type for the extension
"""
# Normalize extension (remove dot if present)
if extension.startswith('.'):
extension = extension[1:]
# Map extensions to MIME types
mime_types = {
'txt': 'text/plain',
'json': 'application/json',
'xml': 'application/xml',
'csv': 'text/csv',
'html': 'text/html',
'htm': 'text/html',
'md': 'text/markdown',
'py': 'text/x-python',
'js': 'application/javascript',
'css': 'text/css',
'pdf': 'application/pdf',
'doc': 'application/msword',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'xls': 'application/vnd.ms-excel',
'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'ppt': 'application/vnd.ms-powerpoint',
'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'svg': 'image/svg+xml',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'bmp': 'image/bmp',
'webp': 'image/webp',
'zip': 'application/zip',
'rar': 'application/x-rar-compressed',
'7z': 'application/x-7z-compressed',
'tar': 'application/x-tar',
'gz': 'application/gzip'
}
return mime_types.get(extension.lower(), 'application/octet-stream')
def getFileExtension(self, fileName: str) -> str:
"""
Extract file extension from fileName.
Args:
fileName: Name of the file
Returns:
str: File extension (without dot)
"""
if '.' in fileName:
return fileName.split('.')[-1].lower()
return "txt" # Default to text
def getFileExtension(self, fileName):
"""
Extract file extension from fileName (without dot, lowercased).
Returns empty string if no extension is found.
"""
if '.' in fileName:
return fileName.rsplit('.', 1)[-1].lower()
return ''
# ===== Functions =====
def getMethodsList(self) -> List[str]:
"""Get list of available methods with their signatures in the required format"""
methodList = []
for methodName, method in self.methods.items():
methodInstance = method['instance']
for actionName, action in method['actions'].items():
# Use the new signature format from MethodBase
signature = methodInstance.getActionSignature(actionName)
if signature:
methodList.append(signature)
return methodList
def generateDocumentLabel(self, document: ChatDocument, message: ChatMessage) -> str:
"""Generate new document label: round+task+action+filename.extension"""
try:
# Get workflow context from message
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
# Get file extension from document's fileName property
try:
file_extension = self.getFileExtension(document.fileName)
filename = document.fileName
except Exception as e:
# Try to diagnose and recover the issue
diagnosis = self.diagnoseDocumentAccess(document)
logger.error(f"Critical error: Cannot access document fileName for document {document.id}. Diagnosis: {diagnosis}")
# Attempt recovery
if self.recoverDocumentAccess(document):
try:
file_extension = self.getFileExtension(document.fileName)
filename = document.fileName
logger.info(f"Document access recovered for {document.id}")
except Exception as recovery_error:
logger.error(f"Recovery failed for document {document.id}: {str(recovery_error)}")
raise RuntimeError(f"Document {document.id} is permanently inaccessible after recovery attempt: {str(recovery_error)}")
else:
# Recovery failed - don't continue with invalid data
raise RuntimeError(f"Document {document.id} is inaccessible and recovery failed. Diagnosis: {diagnosis}")
# Construct label: round1_task2_action3_filename.ext
if file_extension:
label = f"round{round_num}_task{task_num}_action{action_num}_{filename}"
else:
label = f"round{round_num}_task{task_num}_action{action_num}_{filename}"
return label
except Exception as e:
logger.error(f"Critical error generating document label for document {document.id}: {str(e)}")
# Re-raise the error to prevent workflow from continuing with invalid data
raise
def getDocumentReferenceList(self) -> Dict[str, List[DocumentExchange]]:
"""Get list of document exchanges with new labeling format, sorted by recency"""
# Collect all documents first and refresh their attributes
all_documents = []
for message in self.workflow.messages:
if message.documents:
all_documents.extend(message.documents)
# Refresh file attributes for all documents
if all_documents:
self.refreshDocumentFileAttributes(all_documents)
chat_exchanges = []
history_exchanges = []
# Process messages in reverse order; "first" marks boundary
in_current_round = True
for message in reversed(self.workflow.messages):
is_first = message.status == "first" if hasattr(message, 'status') else False
# Build a DocumentExchange if message has documents
doc_exchange = None
if message.documents:
if message.actionId and message.documentsLabel:
# Use new document label format
doc_refs = []
for doc in message.documents:
doc_ref = self.getDocumentReferenceFromChatDocument(doc, message)
doc_refs.append(doc_ref)
doc_exchange = DocumentExchange(
documentsLabel=message.documentsLabel,
documents=doc_refs
)
else:
# Generate new labels for documents without explicit labels
doc_refs = []
for doc in message.documents:
doc_ref = self.getDocumentReferenceFromChatDocument(doc, message)
doc_refs.append(doc_ref)
if doc_refs:
# Create a label based on message context
round_num = message.roundNumber if hasattr(message, 'roundNumber') else 1
task_num = message.taskNumber if hasattr(message, 'taskNumber') else 0
action_num = message.actionNumber if hasattr(message, 'actionNumber') else 0
context_label = f"round{round_num}_task{task_num}_action{action_num}_context"
doc_exchange = DocumentExchange(
documentsLabel=context_label,
documents=doc_refs
)
# Append to appropriate container based on boundary
if doc_exchange:
if in_current_round:
chat_exchanges.append(doc_exchange)
else:
history_exchanges.append(doc_exchange)
# Flip boundary after including the "first" message in chat
if in_current_round and is_first:
in_current_round = False
# Sort by recency: most recent first, then current round, then earlier rounds
# Sort chat exchanges by message sequence number (most recent first)
chat_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True)
# Sort history exchanges by message sequence number (most recent first)
history_exchanges.sort(key=lambda x: self._getMessageSequenceForExchange(x), reverse=True)
return {
"chat": chat_exchanges,
"history": history_exchanges
}
def _getMessageSequenceForExchange(self, exchange: DocumentExchange) -> int:
"""Get message sequence number for sorting exchanges by recency"""
try:
# Extract message ID from the first document reference
if exchange.documents and len(exchange.documents) > 0:
first_doc_ref = exchange.documents[0]
if first_doc_ref.startswith("docItem:"):
# docItem:<id>:<label> - extract ID
parts = first_doc_ref.split(':')
if len(parts) >= 2:
doc_id = parts[1]
# Find the message containing this document
for message in self.workflow.messages:
if message.documents:
for doc in message.documents:
if doc.id == doc_id:
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
elif first_doc_ref.startswith("docList:"):
# docList:<message_id>:<label> - extract message ID
parts = first_doc_ref.split(':')
if len(parts) >= 2:
message_id = parts[1]
# Find the message by ID
for message in self.workflow.messages:
if str(message.id) == message_id:
return message.sequenceNr if hasattr(message, 'sequenceNr') else 0
return 0
except Exception as e:
logger.error(f"Error getting message sequence for exchange: {str(e)}")
return 0
def getEnhancedDocumentContext(self) -> str:
"""Get enhanced document context formatted for action planning prompts with proper docList and docItem references"""
try:
document_list = self.getDocumentReferenceList()
# Build technical context string for AI action planning
context = "AVAILABLE DOCUMENTS:\n\n"
# Process chat exchanges (current round)
if document_list["chat"]:
context += "CURRENT ROUND DOCUMENTS:\n"
for exchange in document_list["chat"]:
# Generate docList reference for the exchange (using message ID)
doc_list_ref = f"docList:{exchange.documentsLabel}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list
for doc_ref in exchange.documents:
if doc_ref.startswith("docItem:"):
context += f" - {doc_ref}\n"
else:
# Convert to proper docItem format if needed
context += f" - docItem:{doc_ref}\n"
context += "\n"
# Process history exchanges (previous rounds)
if document_list["history"]:
context += "WORKFLOW HISTORY DOCUMENTS:\n"
for exchange in document_list["history"]:
# Generate docList reference for the exchange (using message ID)
doc_list_ref = f"docList:{exchange.documentsLabel}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list
for doc_ref in exchange.documents:
if doc_ref.startswith("docItem:"):
context += f" - {doc_ref}\n"
else:
# Convert to proper docItem format if needed
context += f" - docItem:{doc_ref}\n"
context += "\n"
if not document_list["chat"] and not document_list["history"]:
context += "NO DOCUMENTS AVAILABLE - This workflow has no documents to process.\n"
return context
except Exception as e:
logger.error(f"Error generating enhanced document context: {str(e)}")
return "NO DOCUMENTS AVAILABLE - Error generating document context."
def _extractDocumentInfoFromReference(self, doc_ref: str) -> Dict[str, str]:
"""Extract document information from reference string"""
try:
if doc_ref.startswith("docItem:"):
# docItem:<id>:<label>
parts = doc_ref.split(':')
if len(parts) >= 3:
label = parts[2]
# Parse label: round1_task2_action3_filename.ext
if label.startswith("round"):
label_parts = label.split('_', 3)
if len(label_parts) >= 4:
round_num = label_parts[0].replace('round', '')
task_num = label_parts[1].replace('task', '')
action_num = label_parts[2].replace('action', '')
filename = label_parts[3]
context = f"Round {round_num}, Task {task_num}, Action {action_num}"
return {
'filename': filename,
'context': context
}
# Fallback for non-standard labels
return {
'filename': label,
'context': 'Unknown context'
}
elif doc_ref.startswith("docList:"):
# docList:<message_id>:<label>
parts = doc_ref.split(':')
if len(parts) >= 3:
label = parts[2]
if label.startswith("round"):
label_parts = label.split('_', 3)
if len(label_parts) >= 4:
round_num = label_parts[0].replace('round', '')
task_num = label_parts[1].replace('task', '')
action_num = label_parts[2].replace('action', '')
filename = label_parts[3]
context = f"Round {round_num}, Task {task_num}, Action {action_num}"
return {
'filename': filename,
'context': context
}
return {
'filename': label,
'context': 'Message context'
}
return None
except Exception as e:
logger.error(f"Error extracting document info from reference: {str(e)}")
return None
def getDocumentReferenceFromChatDocument(self, document: ChatDocument, message: ChatMessage) -> str:
"""Get document reference using document ID and filename."""
try:
# Use document ID and filename for simple reference
return f"docItem:{document.id}:{document.fileName}"
except Exception as e:
logger.error(f"Critical error creating document reference for document {document.id}: {str(e)}")
# Re-raise the error to prevent workflow from continuing with invalid data
raise
def getDocumentListReferenceFromChatMessage(self, message: ChatMessage) -> str:
"""Get document list reference using message ID and label."""
try:
# Use message ID and documentsLabel for document list reference
label = getattr(message, 'documentsLabel', f"message_{message.id}")
return f"docList:{message.id}:{label}"
except Exception as e:
logger.error(f"Critical error creating document list reference for message {message.id}: {str(e)}")
# Re-raise the error to prevent workflow from continuing with invalid data
raise
def getChatDocumentsFromDocumentList(self, documentList: List[str]) -> List[ChatDocument]:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
all_documents = []
for doc_ref in documentList:
if doc_ref.startswith("docItem:"):
# docItem:<id>:<filename> - extract ID and find document
parts = doc_ref.split(':')
if len(parts) >= 2:
doc_id = parts[1]
# Find the document by ID
for message in self.workflow.messages:
if message.documents:
for doc in message.documents:
if doc.id == doc_id:
all_documents.append(doc)
break
elif doc_ref.startswith("docList:"):
# docList:<messageId>:<label> - extract message ID and find document list
parts = doc_ref.split(':')
if len(parts) >= 2:
message_id = parts[1]
# Find the message by ID and get all its documents
for message in self.workflow.messages:
if str(message.id) == message_id:
if message.documents:
all_documents.extend(message.documents)
break
else:
# Direct label reference (round1_task2_action3_contextinfo)
# Search for messages with matching documentsLabel to find the actual documents
if doc_ref.startswith("round"):
# Parse round/task/action to find the corresponding document list
label_parts = doc_ref.split('_', 3)
if len(label_parts) >= 4:
round_num = int(label_parts[0].replace('round', ''))
task_num = int(label_parts[1].replace('task', ''))
action_num = int(label_parts[2].replace('action', ''))
context_info = label_parts[3]
logger.debug(f"Resolving round reference: round{round_num}_task{task_num}_action{action_num}_{context_info}")
logger.debug(f"Looking for messages with documentsLabel matching: {doc_ref}")
# Find messages with matching documentsLabel (this is the correct way!)
# In case of retries, we want the NEWEST message (most recent publishedAt)
matching_messages = []
for message in self.workflow.messages:
msg_documents_label = getattr(message, 'documentsLabel', '')
# Check if this message's documentsLabel matches our reference
if msg_documents_label == doc_ref:
# Found a matching message, collect it for comparison
matching_messages.append(message)
logger.debug(f"Found message {message.id} with matching documentsLabel: {msg_documents_label}")
# If we found matching messages, take the newest one (highest publishedAt)
if matching_messages:
# Sort by publishedAt descending (newest first)
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
newest_message = matching_messages[0]
logger.debug(f"Found {len(matching_messages)} matching messages, using newest: {newest_message.id} (publishedAt: {getattr(newest_message, 'publishedAt', 'unknown')})")
logger.debug(f"Newest message has {len(newest_message.documents) if newest_message.documents else 0} documents")
if newest_message.documents:
all_documents.extend(newest_message.documents)
logger.debug(f"Added {len(newest_message.documents)} documents from newest message {newest_message.id}")
else:
logger.debug(f"No documents found in newest message {newest_message.id}")
else:
logger.debug(f"No messages found with documentsLabel: {doc_ref}")
# Fallback: also check if any message has this documentsLabel as a prefix
logger.debug(f"Trying fallback search for messages with documentsLabel containing: {doc_ref}")
fallback_messages = []
for message in self.workflow.messages:
msg_documents_label = getattr(message, 'documentsLabel', '')
if msg_documents_label and msg_documents_label.startswith(doc_ref):
fallback_messages.append(message)
logger.debug(f"Found fallback message {message.id} with documentsLabel: {msg_documents_label}")
if fallback_messages:
# Sort by publishedAt descending (newest first)
fallback_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
newest_fallback = fallback_messages[0]
logger.debug(f"Using fallback message {newest_fallback.id} with documentsLabel: {getattr(newest_fallback, 'documentsLabel', 'unknown')}")
if newest_fallback.documents:
all_documents.extend(newest_fallback.documents)
logger.debug(f"Added {len(newest_fallback.documents)} documents from fallback message {newest_fallback.id}")
else:
logger.debug(f"No documents found in fallback message {newest_fallback.id}")
else:
logger.debug(f"No fallback messages found either")
logger.debug(f"Resolved {len(all_documents)} documents from document list: {documentList}")
return all_documents
except Exception as e:
logger.error(f"Error getting documents from document list: {str(e)}")
return []
def getConnectionReferenceList(self) -> List[str]:
"""Get list of all UserConnection objects as references with enhanced state information"""
connections = []
# Get user connections through AppObjects interface
logger.debug(f"getConnectionReferenceList: Service center user ID: {self.user.id}")
logger.debug(f"getConnectionReferenceList: Service center user type: {type(self.user)}")
logger.debug(f"getConnectionReferenceList: Service center user object: {self.user}")
user_connections = self.interfaceApp.getUserConnections(self.user.id)
logger.debug(f"getConnectionReferenceList: User ID: {self.user.id}")
logger.debug(f"getConnectionReferenceList: Raw user connections: {user_connections}")
logger.debug(f"getConnectionReferenceList: User connections type: {type(user_connections)}")
logger.debug(f"getConnectionReferenceList: User connections length: {len(user_connections) if user_connections else 0}")
refreshed_count = 0
for conn in user_connections:
# Get enhanced connection reference with state information
enhanced_ref = self.getConnectionReferenceFromUserConnection(conn)
logger.debug(f"getConnectionReferenceList: Enhanced ref for connection {conn.id}: {enhanced_ref}")
connections.append(enhanced_ref)
# Count refreshed tokens
if "refreshed" in enhanced_ref:
refreshed_count += 1
# Sort by connection reference
logger.debug(f"getConnectionReferenceList: Final connections list: {connections}")
if refreshed_count > 0:
logger.info(f"Refreshed {refreshed_count} connection tokens while building action planning prompt")
return sorted(connections)
def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str:
"""Get connection reference from UserConnection with enhanced state information"""
# Get token information to check if it's expired
token = None
token_status = "unknown"
try:
# Use getConnectionToken with auto_refresh=True to automatically refresh expired tokens
logger.debug(f"Getting connection token for connection {connection.id} with auto_refresh=True")
token = self.interfaceApp.getConnectionToken(connection.id, auto_refresh=True)
if token:
if hasattr(token, 'expiresAt') and token.expiresAt:
current_time = get_utc_timestamp()
logger.debug(f"getConnectionReferenceFromUserConnection: Current time: {current_time}")
logger.debug(f"getConnectionReferenceFromUserConnection: Token expires at: {token.expiresAt}")
if current_time > token.expiresAt:
token_status = "expired"
else:
# Check if this token was recently refreshed (within last 5 minutes)
time_since_creation = current_time - token.createdAt if hasattr(token, 'createdAt') else 0
if time_since_creation < 300: # 5 minutes
token_status = "valid (refreshed)"
else:
token_status = "valid"
else:
token_status = "no_expiration"
else:
token_status = "no_token"
except Exception as e:
token_status = f"error: {str(e)}"
# Build enhanced reference with state information
base_ref = f"connection:{connection.authority.value}:{connection.externalUsername}:{connection.id}"
state_info = f" [status:{connection.status.value}, token:{token_status}]"
logger.debug(f"getConnectionReferenceFromUserConnection: Built reference: {base_ref + state_info}")
return base_ref + state_info
def getUserConnectionFromConnectionReference(self, connectionReference: str) -> Optional[UserConnection]:
"""Get UserConnection from reference string (handles both old and enhanced formats)"""
try:
# Parse reference format: connection:{authority}:{username}:{id} [status:..., token:...]
# Remove state information if present
base_reference = connectionReference.split(' [')[0]
parts = base_reference.split(':')
if len(parts) != 4 or parts[0] != "connection":
return None
authority = parts[1]
username = parts[2]
conn_id = parts[3]
# Get user connections through AppObjects interface
user_connections = self.interfaceApp.getUserConnections(self.user.id)
# Find matching connection
for conn in user_connections:
if str(conn.id) == conn_id and conn.authority.value == authority and conn.externalUsername == username:
return conn
return None
except Exception as e:
logger.error(f"Error parsing connection reference: {str(e)}")
return None
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
"""
Summarize chat messages from last to first message with status="first"
Args:
messages: List of chat messages to summarize
Returns:
str: Summary of the chat in user's language
"""
try:
# Get messages from last to first, stopping at first message with status="first"
relevantMessages = []
for msg in reversed(messages):
relevantMessages.append(msg)
if msg.status == "first":
break
# Create prompt for AI
prompt = f"""You are an AI assistant providing a summary of a chat conversation.
Please respond in '{self.user.language}' language.
Chat History:
{chr(10).join(f"- {msg.message}" for msg in reversed(relevantMessages))}
Instructions:
1. Summarize the conversation's key points and outcomes
2. Be concise but informative
3. Use a professional but friendly tone
4. Focus on important decisions and next steps if any
Please provide a comprehensive summary of this conversation."""
# Get summary using AI
return await self.callAiTextBasic(prompt)
except Exception as e:
logger.error(f"Error summarizing chat: {str(e)}")
return f"Error summarizing chat: {str(e)}"
async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
"""Advanced text processing using Anthropic, with fallback to OpenAI basic if advanced fails."""
max_retries = 3
base_delay = 2
last_error = None
# Try advanced AI first, with retries
for attempt in range(max_retries):
try:
prompt_size = self.calculateObjectSize(prompt)
if context:
prompt_size += self.calculateObjectSize(context)
response = await self.interfaceAiCalls.callAiTextAdvanced(prompt, context)
response_size = self.calculateObjectSize(response)
self.updateWorkflowStats(eventLabel="aicall.anthropic.text", bytesSent=prompt_size, bytesReceived=response_size)
return response
except Exception as e:
last_error = e
logger.warning(f"Advanced AI call failed (attempt {attempt+1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
# Fallback to basic AI if advanced fails
logger.info("Falling back to basic AI after advanced AI failed.")
for attempt in range(max_retries):
try:
return await self.callAiTextBasic(prompt, context)
except Exception as e:
last_error = e
logger.warning(f"Basic AI fallback failed (attempt {attempt+1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
logger.error(f"All AI calls failed: {str(last_error)}")
raise Exception(f"All AI calls failed: {str(last_error)}")
async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
"""Basic text processing using OpenAI, with retry logic."""
max_retries = 3
base_delay = 2
last_error = None
for attempt in range(max_retries):
try:
prompt_size = self.calculateObjectSize(prompt)
if context:
prompt_size += self.calculateObjectSize(context)
response = await self.interfaceAiCalls.callAiTextBasic(prompt, context)
response_size = self.calculateObjectSize(response)
self.updateWorkflowStats(eventLabel="aicall.openai.text", bytesSent=prompt_size, bytesReceived=response_size)
return response
except Exception as e:
last_error = e
logger.warning(f"Basic AI call failed (attempt {attempt+1}/{max_retries}): {str(e)}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
logger.error(f"Basic AI call failed after {max_retries} attempts: {str(last_error)}")
raise Exception(f"Basic AI call failed after {max_retries} attempts: {str(last_error)}")
async def callAiImageBasic(self, prompt: str, imageData: str, mimeType: str) -> str:
"""Basic image processing using OpenAI"""
# Calculate prompt size for stats
prompt_size = self.calculateObjectSize(prompt)
prompt_size += self.calculateObjectSize(imageData)
# Call AI
response = await self.interfaceAiCalls.callAiImageBasic(prompt, imageData, mimeType)
# Calculate response size for stats
response_size = self.calculateObjectSize(response)
# Update stats
self.updateWorkflowStats(eventLabel="aicall.openai.image", bytesSent=prompt_size, bytesReceived=response_size)
return response
async def callAiImageAdvanced(self, prompt: str, imageData: str, mimeType: str) -> str:
"""Advanced image processing using Anthropic"""
# Calculate prompt size for stats
prompt_size = self.calculateObjectSize(prompt)
prompt_size += self.calculateObjectSize(imageData)
# Call AI
response = await self.interfaceAiCalls.callAiImageAdvanced(prompt, imageData, mimeType)
# Calculate response size for stats
response_size = self.calculateObjectSize(response)
# Update stats
self.updateWorkflowStats(eventLabel="aicall.anthropic.image", bytesSent=prompt_size, bytesReceived=response_size)
return response
def getFileInfo(self, fileId: str) -> Dict[str, Any]:
"""Get file information"""
file_item = self.interfaceComponent.getFile(fileId)
if file_item:
return {
"id": file_item.id,
"fileName": file_item.fileName,
"size": file_item.fileSize,
"mimeType": file_item.mimeType,
"fileHash": file_item.fileHash,
"creationDate": file_item.creationDate
}
return None
def getFileData(self, fileId: str) -> bytes:
"""Get file data by ID"""
return self.interfaceComponent.getFileData(fileId)
async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent:
"""Extract content from ChatDocument using prompt"""
try:
# ChatDocument is just a reference, so we need to get file data using fileId
if not hasattr(document, 'fileId') or not document.fileId:
logger.error(f"Document {document.id} has no fileId")
raise ValueError("Document has no fileId")
# Get file data from service center using document's fileId
fileData = self.getFileData(document.fileId)
if not fileData:
logger.error(f"No file data found for fileId: {document.fileId}")
raise ValueError("No file data found for document")
# Get fileName and mime type from document properties
try:
fileName = document.fileName
mimeType = document.mimeType
except Exception as e:
# Try to diagnose and recover the issue
diagnosis = self.diagnoseDocumentAccess(document)
logger.error(f"Critical error: Cannot access document properties for document {document.id}. Diagnosis: {diagnosis}")
# Attempt recovery
if self.recoverDocumentAccess(document):
try:
fileName = document.fileName
mimeType = document.mimeType
logger.info(f"Document access recovered for {document.id} - proceeding with AI extraction")
except Exception as recovery_error:
logger.error(f"Recovery failed for document {document.id}: {str(recovery_error)}")
raise RuntimeError(f"Document {document.id} properties are permanently inaccessible after recovery attempt - cannot proceed with AI extraction: {str(recovery_error)}")
else:
# Recovery failed - don't continue with invalid data
raise RuntimeError(f"Document {document.id} properties are inaccessible and recovery failed. Diagnosis: {diagnosis}")
# Process with document processor directly
extractedContent = await self.documentProcessor.processFileData(
fileData=fileData,
fileName=fileName,
mimeType=mimeType,
base64Encoded=False,
prompt=prompt,
documentId=document.id
)
# Note: ExtractedContent model only has 'id' and 'contents' fields
# No need to set objectId or objectType as they don't exist in the model
return extractedContent
except Exception as e:
logger.error(f"Error extracting from document: {str(e)}")
raise
def createFile(self, fileName: str, mimeType: str, content: str, base64encoded: bool = False) -> str:
"""Create new file and return its ID"""
# Convert content to bytes based on base64 flag
if base64encoded:
import base64
content_bytes = base64.b64decode(content)
else:
content_bytes = content.encode('utf-8')
# Create the file (hash and size are computed inside interfaceComponent)
file_item = self.interfaceComponent.createFile(
name=fileName,
mimeType=mimeType,
content=content_bytes
)
# Then store the file data
self.interfaceComponent.createFileData(file_item.id, content_bytes)
return file_item.id
def createDocument(self, fileName: str, mimeType: str, content: str, base64encoded: bool = True, existing_file_id: str = None) -> ChatDocument:
"""Create document AND file from file data object created by AI call"""
# Use existing file ID if provided, otherwise create new file
if existing_file_id:
file_id = existing_file_id
else:
# First create the file and get its ID
file_id = self.createFile(fileName, mimeType, content, base64encoded)
# Get file info to copy attributes
file_info = self.getFileInfo(file_id)
if not file_info:
logger.error(f"Could not get file info for fileId: {file_id}")
raise ValueError(f"File info not found for fileId: {file_id}")
# Create document with all file attributes copied
document = ChatDocument(
id=str(uuid.uuid4()),
fileId=file_id,
fileName=file_info.get("fileName", fileName),
fileSize=file_info.get("size", 0),
mimeType=file_info.get("mimeType", mimeType)
)
return document
def updateWorkflowStats(self, eventLabel: str = None, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None:
"""
Centralized function to update workflow statistics in database and running workflow.
Args:
eventLabel: Label for the event (e.g., "userinput", "taskplan", "action", "aicall<ainame>")
bytesSent: Bytes sent (incremental)
bytesReceived: Bytes received (incremental)
tokenCount: Token count (incremental, default 0)
"""
try:
if hasattr(self, 'workflow') and self.workflow:
# Update the running workflow stats
self.interfaceChat.updateWorkflowStats(
self.workflow.id,
bytesSent=bytesSent,
bytesReceived=bytesReceived
)
except Exception as e:
logger.error(f"Error updating workflow stats: {str(e)}")
def calculateObjectSize(self, obj: Any) -> int:
"""
Calculate the size of an object in bytes.
Args:
obj: Object to calculate size for
Returns:
int: Size in bytes
"""
try:
import json
import sys
if obj is None:
return 0
# Convert object to JSON string and calculate size
json_str = json.dumps(obj, ensure_ascii=False, default=str)
return len(json_str.encode('utf-8'))
except Exception as e:
logger.error(f"Error calculating object size: {str(e)}")
return 0
def getAvailableDocuments(self, workflow) -> List[str]:
"""
Get list of available document fileNames from workflow with new labeling format.
Args:
workflow: ChatWorkflow object
Returns:
List[str]: List of document labels in new format
"""
documents = []
for message in workflow.messages:
for doc in message.documents:
# Generate new label format
label = self.generateDocumentLabel(doc, message)
documents.append(label)
return documents
async def executeAction(self, methodName: str, actionName: str, parameters: Dict[str, Any]) -> ActionResult:
"""Execute a method action"""
try:
if methodName not in self.methods:
raise ValueError(f"Unknown method: {methodName}")
method = self.methods[methodName]
if actionName not in method['actions']:
raise ValueError(f"Unknown action: {actionName} for method {methodName}")
action = method['actions'][actionName]
# Execute the action
return await action['method'](parameters)
except Exception as e:
logger.error(f"Error executing method {methodName}.{actionName}: {str(e)}")
raise
async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]:
"""Process file IDs from existing files and return ChatDocument objects"""
documents = []
for fileId in fileIds:
try:
# Get file info from service
fileInfo = self.getFileInfo(fileId)
if fileInfo:
# Create document directly with all file attributes
document = ChatDocument(
id=str(uuid.uuid4()),
fileId=fileId,
fileName=fileInfo.get("fileName", "unknown"),
fileSize=fileInfo.get("size", 0),
mimeType=fileInfo.get("mimeType", "application/octet-stream")
)
documents.append(document)
logger.info(f"Processed file ID {fileId} -> {document.fileName}")
else:
logger.warning(f"No file info found for file ID {fileId}")
except Exception as e:
logger.error(f"Error processing file ID {fileId}: {str(e)}")
return documents
def setUserLanguage(self, language: str) -> None:
"""Set user language for the service center"""
self.user.language = language
def setWorkflowContext(self, round_number: int = None, task_number: int = None, action_number: int = None):
"""Set current workflow context for document generation and routing"""
try:
# Prepare update data
update_data = {}
if round_number is not None:
self.workflow.currentRound = round_number
update_data["currentRound"] = round_number
if task_number is not None:
self.workflow.currentTask = task_number
update_data["currentTask"] = task_number
if action_number is not None:
self.workflow.currentAction = action_number
update_data["currentAction"] = action_number
# Persist changes to database if any updates were made
if update_data:
self.interfaceChat.updateWorkflow(self.workflow.id, update_data)
logger.debug(f"Updated workflow context: Round {self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 'N/A'}, Task {self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 'N/A'}, Action {self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 'N/A'}")
except Exception as e:
logger.error(f"Error setting workflow context: {str(e)}")
def getWorkflowContext(self) -> Dict[str, int]:
"""Get current workflow context for document generation"""
try:
return {
'currentRound': self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0,
'currentTask': self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0,
'currentAction': self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
}
except Exception as e:
logger.error(f"Error getting workflow context: {str(e)}")
return {'currentRound': 0, 'currentTask': 0, 'currentAction': 0}
def incrementWorkflowContext(self, context_type: str):
"""Increment workflow context counters"""
try:
update_data = {}
if context_type == 'round':
current_round = self.workflow.currentRound if hasattr(self.workflow, 'currentRound') else 0
self.workflow.currentRound = current_round + 1
# Reset task and action when round changes
self.workflow.currentTask = 0
self.workflow.currentAction = 0
update_data = {
"currentRound": self.workflow.currentRound,
"currentTask": 0,
"currentAction": 0
}
logger.info(f"Incremented workflow round to {self.workflow.currentRound}")
elif context_type == 'task':
current_task = self.workflow.currentTask if hasattr(self.workflow, 'currentTask') else 0
self.workflow.currentTask = current_task + 1
# Reset action when task changes
self.workflow.currentAction = 0
update_data = {
"currentTask": self.workflow.currentTask,
"currentAction": 0
}
logger.info(f"Incremented workflow task to {self.workflow.currentTask}")
elif context_type == 'action':
current_action = self.workflow.currentAction if hasattr(self.workflow, 'currentAction') else 0
self.workflow.currentAction = current_action + 1
update_data = {
"currentAction": self.workflow.currentAction
}
logger.info(f"Incremented workflow action to {self.workflow.currentAction}")
else:
logger.warning(f"Unknown context type for increment: {context_type}")
return
# Persist changes to database
if update_data:
self.interfaceChat.updateWorkflow(self.workflow.id, update_data)
except Exception as e:
logger.error(f"Error incrementing workflow context: {str(e)}")
def getWorkflowStats(self) -> Dict[str, Any]:
"""Get comprehensive workflow statistics including current context"""
try:
workflow_context = self.getWorkflowContext()
return {
'currentRound': workflow_context['currentRound'],
'currentTask': workflow_context['currentTask'],
'currentAction': workflow_context['currentAction'],
'totalTasks': self.workflow.totalTasks if hasattr(self.workflow, 'totalTasks') else 0,
'totalActions': self.workflow.totalActions if hasattr(self.workflow, 'totalActions') else 0,
'workflowStatus': self.workflow.status if hasattr(self.workflow, 'status') else 'unknown',
'workflowId': self.workflow.id if hasattr(self.workflow, 'id') else 'unknown'
}
except Exception as e:
logger.error(f"Error getting workflow stats: {str(e)}")
return {
'currentRound': 0,
'currentTask': 0,
'currentAction': 0,
'totalTasks': 0,
'totalActions': 0,
'workflowStatus': 'unknown',
'workflowId': 'unknown'
}
def refreshDocumentFileAttributes(self, documents: List[ChatDocument]) -> None:
"""Update file attributes (fileName, fileSize, mimeType) for documents"""
for doc in documents:
try:
file_item = self.interfaceComponent.getFile(doc.fileId)
if file_item:
doc.fileName = file_item.fileName
doc.fileSize = file_item.fileSize
doc.mimeType = file_item.mimeType
else:
logger.warning(f"File not found for document {doc.id}, fileId: {doc.fileId}")
except Exception as e:
logger.error(f"Error refreshing file attributes for document {doc.id}: {e}")
# Note: Workflow progress update methods have been moved to handlingTasks.py
# where they belong since that's where the actual workflow execution happens
# This avoids circular import issues between ServiceCenter and ChatInterface
def diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]:
"""
Diagnose document access issues and provide recovery information.
This method helps identify why document properties are inaccessible.
"""
try:
diagnosis = {
'document_id': document.id,
'file_id': document.fileId,
'has_component_interface': document._componentInterface is not None,
'component_interface_type': type(document._componentInterface).__name__ if document._componentInterface else None,
'file_exists': False,
'file_info': None,
'error_details': None
}
# Check if component interface is set
if not document._componentInterface:
diagnosis['error_details'] = "Component interface not set - document cannot access file system"
return diagnosis
# Try to access the file directly
try:
file_info = self.interfaceComponent.getFile(document.fileId)
if file_info:
diagnosis['file_exists'] = True
diagnosis['file_info'] = {
'fileName': file_info.fileName if hasattr(file_info, 'fileName') else 'N/A',
'fileSize': file_info.fileSize if hasattr(file_info, 'fileSize') else 'N/A',
'mimeType': file_info.mimeType if hasattr(file_info, 'mimeType') else 'N/A'
}
else:
diagnosis['error_details'] = f"File with ID {document.fileId} not found in component interface"
except Exception as e:
diagnosis['error_details'] = f"Error accessing file {document.fileId}: {str(e)}"
return diagnosis
except Exception as e:
return {
'document_id': document.id if hasattr(document, 'id') else 'unknown',
'file_id': document.fileId if hasattr(document, 'fileId') else 'unknown',
'error_details': f"Error during diagnosis: {str(e)}"
}
def recoverDocumentAccess(self, document: ChatDocument) -> bool:
"""
Attempt to recover document access by re-setting the component interface.
Returns True if recovery was successful.
"""
try:
logger.info(f"Attempting to recover document access for document {document.id}")
# Re-set the component interface
document.setComponentInterface(self.interfaceComponent)
# Test if we can now access the fileName
try:
test_fileName = document.fileName
logger.info(f"Document access recovered for {document.id} -> {test_fileName}")
return True
except Exception as e:
logger.error(f"Document access recovery failed for {document.id}: {str(e)}")
return False
except Exception as e:
logger.error(f"Error during document access recovery for {document.id}: {str(e)}")
return False
# Create singleton instance
serviceObject = None