Core AI system ready for production

This commit is contained in:
ValueOn AG 2025-11-04 01:34:57 +01:00
parent a53d8f8e33
commit 7ee5d4061b
4 changed files with 94 additions and 91 deletions

View file

@ -75,11 +75,11 @@ class AiObjects:
# AI for Extraction, Processing, Generation
async def call(self, request: AiCallRequest) -> AiCallResponse:
async def call(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse:
"""Call AI model for text generation with model-aware chunking."""
# Handle content parts (unified path)
if hasattr(request, 'contentParts') and request.contentParts:
return await self._callWithContentParts(request)
return await self._callWithContentParts(request, progressCallback)
# Handle traditional text/context calls
return await self._callWithTextContext(request)
@ -148,7 +148,7 @@ class AiObjects:
errorCount=1
)
async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
async def _callWithContentParts(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse:
"""Process content parts with model-aware chunking (unified for single and multiple parts)."""
prompt = request.prompt
options = request.options
@ -164,7 +164,7 @@ class AiObjects:
# Process each content part
allResults = []
for contentPart in contentParts:
partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList)
partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList, progressCallback)
allResults.append(partResult)
# Merge all results
@ -180,7 +180,7 @@ class AiObjects:
errorCount=sum(r.errorCount for r in allResults)
)
async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList) -> AiCallResponse:
async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList, progressCallback=None) -> AiCallResponse:
"""Process a single content part with model-aware chunking and fallback."""
lastError = None
@ -263,11 +263,34 @@ class AiObjects:
if not chunks:
raise ValueError(f"Failed to chunk content part for model {model.name}")
logger.info(f"Starting to process {len(chunks)} chunks with model {model.name}")
# Log progress if callback provided
if progressCallback:
progressCallback(0.0, f"Starting to process {len(chunks)} chunks")
# Process each chunk
chunkResults = []
for chunk in chunks:
chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options)
chunkResults.append(chunkResponse)
for idx, chunk in enumerate(chunks):
chunkNum = idx + 1
logger.info(f"Processing chunk {chunkNum}/{len(chunks)} with model {model.name}")
# Calculate and log progress
if progressCallback:
progress = chunkNum / len(chunks)
progressCallback(progress, f"Processing chunk {chunkNum}/{len(chunks)}")
try:
chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options)
chunkResults.append(chunkResponse)
logger.info(f"✅ Chunk {chunkNum}/{len(chunks)} processed successfully")
# Log completion progress
if progressCallback:
progressCallback(chunkNum / len(chunks), f"Chunk {chunkNum}/{len(chunks)} processed")
except Exception as e:
logger.error(f"❌ Error processing chunk {chunkNum}/{len(chunks)}: {str(e)}")
raise
# Merge chunk results
mergedContent = self._mergeChunkResults(chunkResults)
@ -505,9 +528,15 @@ class AiObjects:
options=options or {}
)
# Log before calling model
logger.debug(f"Calling model {model.name} with {len(messages)} messages, context size: {len(context.encode('utf-8'))} bytes")
# Call the model with standardized interface
modelResponse = await model.functionCall(modelCall)
# Log after successful call
logger.debug(f"Model {model.name} returned successfully")
# Extract content from standardized response
if not modelResponse.success:
raise ValueError(f"Model call failed: {modelResponse.error}")

View file

@ -541,8 +541,24 @@ class ExtractionService:
progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
# Call AI with model-aware chunking
response = await aiObjects.call(request)
# Create progress callback for chunking
def chunkingProgressCallback(chunkProgress: float, status: str):
"""Callback to log chunking progress as ChatLog entries"""
if self.services.workflow and self.services.currentWorkflow:
logData = {
"workflowId": self.services.currentWorkflow.id,
"message": "Service AI",
"type": "info",
"status": status,
"progress": chunkProgress
}
try:
self.services.workflow.storeLog(self.services.currentWorkflow, logData)
except Exception as e:
logger.warning(f"Failed to store chunking progress log: {e}")
# Call AI with model-aware chunking and progress callback
response = await aiObjects.call(request, chunkingProgressCallback)
processing_time = time.time() - start_time

View file

@ -24,8 +24,10 @@ class WorkflowService:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
workflow = self.services.currentWorkflow
workflow_id = workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'
workflow_obj_id = id(workflow) if workflow else None
logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}")
logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow_id}, workflow object id = {workflow_obj_id}")
# Debug: list available messages with their labels and document names
try:
@ -70,48 +72,38 @@ class WorkflowService:
# Format: docList:<messageId>:<label>
message_id = parts[1]
label = parts[2]
# Find the message by ID and get all its documents
message_found = False
# First try to find the message by ID in the current workflow
message_found = None
for message in workflow.messages:
if str(message.id) == message_id:
message_found = True
if message.documents:
doc_names = [doc.fileName for doc in message.documents if hasattr(doc, 'fileName')]
all_documents.extend(message.documents)
else:
pass
message_found = message
break
# If message ID not found in current workflow, this is a stale reference
# Log warning and return empty list (don't fall back to label - it might match wrong message)
if not message_found:
available_ids = [str(msg.id) for msg in workflow.messages]
logger.error(f"Message with ID {message_id} not found in workflow. Available message IDs: {available_ids}")
raise ValueError(f"Document reference not found: docList:{message_id}:{label}")
logger.warning(f"Document reference contains stale message ID {message_id} not found in current workflow {workflow.id}. Label: {label}. Available message IDs: {available_ids}")
logger.warning(f"This indicates the document reference was created in a different workflow state. Returning empty list.")
# Return empty list - don't fall back to label matching which could match wrong message
continue
# If found, add documents
if message_found and message_found.documents:
all_documents.extend(message_found.documents)
elif len(parts) >= 2:
# Format: docList:<label> - find message by documentsLabel
label = parts[1]
# Find messages with matching documentsLabel
matching_messages = []
message_found = None
for message in workflow.messages:
# Check both attribute and raw data for documentsLabel
msg_label = getattr(message, 'documentsLabel', None)
if msg_label == label:
matching_messages.append(message)
else:
pass
message_found = message
break
if matching_messages:
# Use the newest message (highest publishedAt)
matching_messages.sort(key=lambda msg: getattr(msg, 'publishedAt', 0), reverse=True)
newest_message = matching_messages[0]
if newest_message.documents:
doc_names = [doc.fileName for doc in newest_message.documents if hasattr(doc, 'fileName')]
all_documents.extend(newest_message.documents)
else:
pass
else:
logger.error(f"No messages found with documentsLabel: {label}")
raise ValueError(f"Document reference not found: docList:{label}")
# If found, add documents
if message_found and message_found.documents:
all_documents.extend(message_found.documents)
else:
# Direct label reference (round1_task2_action3_contextinfo)
# Search for messages with matching documentsLabel to find the actual documents
@ -580,17 +572,9 @@ class WorkflowService:
# Show history exchanges (previous rounds)
if document_list["history"]:
for exchange in document_list["history"]:
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
doc_list_ref = f"docList:{exchange['documentsLabel']}"
# Use label-only format to avoid stale message ID references
# Labels are stable identifiers that persist across workflow state changes
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} ({len(exchange['documents'])} documents)\n"
else:
@ -608,6 +592,11 @@ class WorkflowService:
if not workflow or not hasattr(workflow, 'messages'):
return "No documents available"
workflow_id = workflow.id if hasattr(workflow, 'id') else 'NO_ID'
workflow_obj_id = id(workflow)
current_workflow_obj_id = id(self.services.currentWorkflow) if self.services.currentWorkflow else None
logger.debug(f"getAvailableDocuments: workflow.id = {workflow_id}, workflow object id = {workflow_obj_id}, currentWorkflow object id = {current_workflow_obj_id}")
# Use the provided workflow object directly to avoid database reload issues
# that can cause filename truncation. The workflow object should already be up-to-date.
@ -623,19 +612,9 @@ class WorkflowService:
if document_list["chat"]:
context += "\nCurrent round documents:\n"
for exchange in document_list["chat"]:
# Generate docList reference for the exchange (using message ID and label)
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
# Fallback to label-only format if message ID not found
doc_list_ref = f"docList:{exchange['documentsLabel']}"
# Use label-only format to avoid stale message ID references
# Labels are stable identifiers that persist across workflow state changes
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list
@ -651,19 +630,9 @@ class WorkflowService:
if document_list["history"]:
context += "\nPast rounds documents:\n"
for exchange in document_list["history"]:
# Generate docList reference for the exchange (using message ID and label)
# Find the message that corresponds to this exchange
message_id = None
for message in workflow.messages:
if hasattr(message, 'documentsLabel') and message.documentsLabel == exchange['documentsLabel']:
message_id = message.id
break
if message_id:
doc_list_ref = f"docList:{message_id}:{exchange['documentsLabel']}"
else:
# Fallback to label-only format if message ID not found
doc_list_ref = f"docList:{exchange['documentsLabel']}"
# Use label-only format to avoid stale message ID references
# Labels are stable identifiers that persist across workflow state changes
doc_list_ref = f"docList:{exchange['documentsLabel']}"
context += f"- {doc_list_ref} contains:\n"
# Generate docItem references for each document in the list

View file

@ -12,7 +12,6 @@ import requests
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions
logger = logging.getLogger(__name__)
@ -1184,20 +1183,10 @@ Return JSON:
# Call AI service to generate email content
try:
ai_response = await self.services.ai.callAiDocuments(
ai_response = await self.services.ai.callAiPlanning(
prompt=ai_prompt,
documents=chatDocuments,
options=AiCallOptions(
operationType="email_composition",
priority="normal",
compressPrompt=False,
compressContext=True,
processDocumentsIndividually=False, # Process all documents together for email composition
processingMode="detailed",
resultFormat="json",
maxCost=0.50,
maxProcessingTime=30
)
placeholders=None,
debugType="email_composition"
)
# Parse AI response