From 7ee5d4061b50a9d29b041f4a9123e619775b1238 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 4 Nov 2025 01:34:57 +0100
Subject: [PATCH] Core AI system ready for production
---
modules/interfaces/interfaceAiObjects.py | 45 ++++++--
.../mainServiceExtraction.py | 20 +++-
.../serviceWorkflow/mainServiceWorkflow.py | 103 ++++++------------
modules/workflows/methods/methodOutlook.py | 17 +--
4 files changed, 94 insertions(+), 91 deletions(-)
diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py
index c297e749..8b665b09 100644
--- a/modules/interfaces/interfaceAiObjects.py
+++ b/modules/interfaces/interfaceAiObjects.py
@@ -75,11 +75,11 @@ class AiObjects:
# AI for Extraction, Processing, Generation
- async def call(self, request: AiCallRequest) -> AiCallResponse:
+ async def call(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse:
"""Call AI model for text generation with model-aware chunking."""
# Handle content parts (unified path)
if hasattr(request, 'contentParts') and request.contentParts:
- return await self._callWithContentParts(request)
+ return await self._callWithContentParts(request, progressCallback)
# Handle traditional text/context calls
return await self._callWithTextContext(request)
@@ -148,7 +148,7 @@ class AiObjects:
errorCount=1
)
- async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
+ async def _callWithContentParts(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse:
"""Process content parts with model-aware chunking (unified for single and multiple parts)."""
prompt = request.prompt
options = request.options
@@ -164,7 +164,7 @@ class AiObjects:
# Process each content part
allResults = []
for contentPart in contentParts:
- partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList)
+ partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList, progressCallback)
allResults.append(partResult)
# Merge all results
@@ -180,7 +180,7 @@ class AiObjects:
errorCount=sum(r.errorCount for r in allResults)
)
- async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList) -> AiCallResponse:
+ async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList, progressCallback=None) -> AiCallResponse:
"""Process a single content part with model-aware chunking and fallback."""
lastError = None
@@ -263,11 +263,34 @@ class AiObjects:
if not chunks:
raise ValueError(f"Failed to chunk content part for model {model.name}")
+ logger.info(f"Starting to process {len(chunks)} chunks with model {model.name}")
+
+ # Log progress if callback provided
+ if progressCallback:
+ progressCallback(0.0, f"Starting to process {len(chunks)} chunks")
+
# Process each chunk
chunkResults = []
- for chunk in chunks:
- chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options)
- chunkResults.append(chunkResponse)
+ for idx, chunk in enumerate(chunks):
+ chunkNum = idx + 1
+ logger.info(f"Processing chunk {chunkNum}/{len(chunks)} with model {model.name}")
+
+ # Calculate and log progress
+ if progressCallback:
+ progress = chunkNum / len(chunks)
+ progressCallback(progress, f"Processing chunk {chunkNum}/{len(chunks)}")
+
+ try:
+ chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options)
+ chunkResults.append(chunkResponse)
+ logger.info(f"✅ Chunk {chunkNum}/{len(chunks)} processed successfully")
+
+ # Log completion progress
+ if progressCallback:
+ progressCallback(chunkNum / len(chunks), f"Chunk {chunkNum}/{len(chunks)} processed")
+ except Exception as e:
+ logger.error(f"❌ Error processing chunk {chunkNum}/{len(chunks)}: {str(e)}")
+ raise
# Merge chunk results
mergedContent = self._mergeChunkResults(chunkResults)
@@ -505,9 +528,15 @@ class AiObjects:
options=options or {}
)
+ # Log before calling model
+ logger.debug(f"Calling model {model.name} with {len(messages)} messages, context size: {len(context.encode('utf-8'))} bytes")
+
# Call the model with standardized interface
modelResponse = await model.functionCall(modelCall)
+ # Log after successful call
+ logger.debug(f"Model {model.name} returned successfully")
+
# Extract content from standardized response
if not modelResponse.success:
raise ValueError(f"Model call failed: {modelResponse.error}")
diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py
index 92d6290d..8217d246 100644
--- a/modules/services/serviceExtraction/mainServiceExtraction.py
+++ b/modules/services/serviceExtraction/mainServiceExtraction.py
@@ -541,8 +541,24 @@ class ExtractionService:
progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
- # Call AI with model-aware chunking
- response = await aiObjects.call(request)
+ # Create progress callback for chunking
+ def chunkingProgressCallback(chunkProgress: float, status: str):
+ """Callback to log chunking progress as ChatLog entries"""
+ if self.services.workflow and self.services.currentWorkflow:
+ logData = {
+ "workflowId": self.services.currentWorkflow.id,
+ "message": "Service AI",
+ "type": "info",
+ "status": status,
+ "progress": chunkProgress
+ }
+ try:
+ self.services.workflow.storeLog(self.services.currentWorkflow, logData)
+ except Exception as e:
+ logger.warning(f"Failed to store chunking progress log: {e}")
+
+ # Call AI with model-aware chunking and progress callback
+ response = await aiObjects.call(request, chunkingProgressCallback)
processing_time = time.time() - start_time
diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py
index 4dad14bf..65019280 100644
--- a/modules/services/serviceWorkflow/mainServiceWorkflow.py
+++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py
@@ -24,8 +24,10 @@ class WorkflowService:
"""Get ChatDocuments from a list of document references using all three formats."""
try:
workflow = self.services.currentWorkflow
+ workflow_id = workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'
+ workflow_obj_id = id(workflow) if workflow else None
logger.debug(f"getChatDocumentsFromDocumentList: input documentList = {documentList}")
- logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow.id if workflow and hasattr(workflow, 'id') else 'NO_ID'}")
+ logger.debug(f"getChatDocumentsFromDocumentList: currentWorkflow.id = {workflow_id}, workflow object id = {workflow_obj_id}")
# Debug: list available messages with their labels and document names
try:
@@ -70,48 +72,38 @@ class WorkflowService:
# Format: docList::