gateway/modules/services/serviceAi/mainServiceAi.py
2025-12-30 02:06:51 +01:00

765 lines
30 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import re
import time
import base64
from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelExtraction import ContentPart, DocumentIntent
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData
from modules.datamodels.datamodelDocument import RenderedDocument
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.shared.jsonUtils import (
extractJsonString,
repairBrokenJson,
extractSectionsFromDocument,
buildContinuationContext,
parseJsonWithModel
)
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
from modules.datamodels.datamodelAi import JsonAccumulationState
logger = logging.getLogger(__name__)
# Rebuild the model to resolve forward references
AiCallRequest.model_rebuild()
class AiService:
"""AI service with core operations integrated."""
def __init__(self, serviceCenter=None) -> None:
"""Initialize AI service with service center access.
Args:
serviceCenter: Service center instance for accessing other services
"""
self.services = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create() or ensureAiObjectsInitialized()
# Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready
self.extractionService = None
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
if self.aiObjects is None:
raise RuntimeError("aiObjects must be initialized before initializing submodules")
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
# Initialize new submodules
from modules.services.serviceAi.subResponseParsing import ResponseParser
from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer
from modules.services.serviceAi.subContentExtraction import ContentExtractor
from modules.services.serviceAi.subStructureGeneration import StructureGenerator
from modules.services.serviceAi.subStructureFilling import StructureFiller
from modules.services.serviceAi.subAiCallLooping import AiCallLooper
if not hasattr(self, 'responseParser'):
logger.info("Initializing ResponseParser...")
self.responseParser = ResponseParser(self.services)
if not hasattr(self, 'intentAnalyzer'):
logger.info("Initializing DocumentIntentAnalyzer...")
self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self)
if not hasattr(self, 'contentExtractor'):
logger.info("Initializing ContentExtractor...")
self.contentExtractor = ContentExtractor(self.services, self, self.intentAnalyzer)
if not hasattr(self, 'structureGenerator'):
logger.info("Initializing StructureGenerator...")
self.structureGenerator = StructureGenerator(self.services, self)
if not hasattr(self, 'structureFiller'):
logger.info("Initializing StructureFiller...")
self.structureFiller = StructureFiller(self.services, self)
if not hasattr(self, 'aiCallLooper'):
logger.info("Initializing AiCallLooper...")
self.aiCallLooper = AiCallLooper(self.services, self, self.responseParser)
async def callAi(self, request: AiCallRequest, progressCallback=None):
"""Router: handles content parts via extractionService, text context via interface.
Replaces direct calls to self.aiObjects.call() to route content parts processing
through serviceExtraction layer.
"""
if hasattr(request, 'contentParts') and request.contentParts:
return await self.extractionService.processContentPartsWithAi(
request, self.aiObjects, progressCallback
)
return await self.aiObjects.callWithTextContext(request)
async def ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized and submodules are ready."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
self.aiObjects = await AiObjects.create()
logger.info("AiObjects initialization completed")
# Initialize submodules after aiObjects is ready
self._initializeSubmodules()
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
"""Create AiService instance with all connectors and submodules initialized."""
logger.info("AiService.create() called")
instance = cls(serviceCenter)
logger.info("AiService created, about to call AiObjects.create()...")
instance.aiObjects = await AiObjects.create()
logger.info("AiObjects.create() completed")
# Initialize all submodules after aiObjects is ready
instance._initializeSubmodules()
logger.info("AiService submodules initialized")
return instance
# Helper methods
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
Args:
prompt: The base prompt template
placeholders: Dictionary of placeholder key-value pairs
Returns:
Prompt with placeholders replaced
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Skip if content is None or empty
if content is None:
continue
# Replace {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", str(content))
return full_prompt
async def _analyzePromptAndCreateOptions(self, prompt: str) -> AiCallOptions:
"""Analyze prompt to determine appropriate AiCallOptions parameters."""
try:
# Get dynamic enum values from Pydantic models
operationTypes = [e.value for e in OperationTypeEnum]
priorities = [e.value for e in PriorityEnum]
processingModes = [e.value for e in ProcessingModeEnum]
# Create analysis prompt for AI to determine operation type and parameters
analysisPrompt = f"""
You are an AI operation analyzer. Analyze the following prompt and determine the most appropriate operation type and parameters.
PROMPT TO ANALYZE:
{self.services.utils.sanitizePromptContent(prompt, 'userinput')}
Based on the prompt content, determine:
1. operationType: Choose the most appropriate from: {', '.join(operationTypes)}
2. priority: Choose from: {', '.join(priorities)}
3. processingMode: Choose from: {', '.join(processingModes)}
4. compressPrompt: true/false (true for story-like prompts, false for structured prompts with JSON/schemas)
5. compressContext: true/false (true to summarize context, false to process fully)
Respond with ONLY a JSON object in this exact format:
{{
"operationType": "dataAnalyse",
"priority": "balanced",
"processingMode": "basic",
"compressPrompt": true,
"compressContext": true
}}
"""
# Use AI to analyze the prompt
request = AiCallRequest(
prompt=analysisPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
compressPrompt=True,
compressContext=False
)
)
response = await self.callAi(request)
# Parse AI response using structured parsing with AiCallOptions model
try:
# Use parseJsonWithModel to parse response into AiCallOptions (handles enum conversion automatically)
analysis = parseJsonWithModel(response.content, AiCallOptions)
return analysis
except Exception as e:
logger.warning(f"Failed to parse AI analysis response: {e}")
except Exception as e:
logger.warning(f"Prompt analysis failed: {e}")
# Fallback to default options
return AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
async def _callAiWithLooping(
self,
prompt: str,
options: AiCallOptions,
debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None,
promptArgs: Optional[Dict[str, Any]] = None,
operationId: Optional[str] = None,
userPrompt: Optional[str] = None,
contentParts: Optional[List[ContentPart]] = None # ARCHITECTURE: Support ContentParts for large content
) -> str:
"""Delegate to AiCallLooper."""
return await self.aiCallLooper.callAiWithLooping(
prompt, options, debugPrefix, promptBuilder, promptArgs, operationId, userPrompt, contentParts
)
async def _defineKpisFromPrompt(
self,
userPrompt: str,
rawJsonString: Optional[str],
continuationContext: Dict[str, Any],
debugPrefix: str = "kpi"
) -> List[Dict[str, Any]]:
"""Delegate to AiCallLooper."""
return await self.aiCallLooper._defineKpisFromPrompt(
userPrompt, rawJsonString, continuationContext, debugPrefix
)
# JSON merging logic moved to subJsonResponseHandling.py
def _extractSectionsFromResponse(
self,
result: str,
iteration: int,
debugPrefix: str,
allSections: List[Dict[str, Any]] = None,
accumulationState: Optional[JsonAccumulationState] = None
) -> Tuple[List[Dict[str, Any]], bool, Optional[Dict[str, Any]], Optional[JsonAccumulationState]]:
"""Delegate to ResponseParser."""
return self.responseParser.extractSectionsFromResponse(
result, iteration, debugPrefix, allSections, accumulationState
)
def _shouldContinueGeneration(
self,
allSections: List[Dict[str, Any]],
iteration: int,
wasJsonComplete: bool,
rawResponse: str = None
) -> bool:
"""Delegate to ResponseParser."""
return self.responseParser.shouldContinueGeneration(
allSections, iteration, wasJsonComplete, rawResponse
)
def _extractDocumentMetadata(
self,
parsedResult: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Delegate to ResponseParser."""
return self.responseParser.extractDocumentMetadata(parsedResult)
def _buildFinalResultFromSections(
self,
allSections: List[Dict[str, Any]],
documentMetadata: Optional[Dict[str, Any]] = None
) -> str:
"""Delegate to ResponseParser."""
return self.responseParser.buildFinalResultFromSections(allSections, documentMetadata)
# Public API Methods
# Planning AI Call
async def callAiPlanning(
self,
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None,
debugType: Optional[str] = None
) -> str:
"""
Planning AI call for task planning, action planning, action selection, etc.
Always uses static parameters optimized for planning tasks.
Args:
prompt: The planning prompt
placeholders: Optional list of placeholder replacements
debugType: Optional debug file type identifier (e.g., 'taskplan', 'dynamic', 'intentanalysis')
If not provided, defaults to 'plan'
Returns:
Planning JSON response
"""
await self.ensureAiObjectsInitialized()
# Planning calls always use static parameters
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
compressPrompt=False,
compressContext=False
)
# Build full prompt with placeholders
if placeholders:
placeholdersDict = {p.label: p.content for p in placeholders}
fullPrompt = self._buildPromptWithPlaceholders(prompt, placeholdersDict)
else:
fullPrompt = prompt
# Root-cause fix: planning must return raw single-shot JSON, not section-based output
request = AiCallRequest(
prompt=fullPrompt,
context="",
options=options
)
# Debug: persist prompt/response for analysis with context-specific naming
debugPrefix = debugType if debugType else "plan"
self.services.utils.writeDebugFile(fullPrompt, f"{debugPrefix}_prompt")
response = await self.aiObjects.callWithTextContext(request)
result = response.content or ""
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
return result
# Helper methods for callAiContent refactoring
async def _handleImageGeneration(
self,
prompt: str,
options: AiCallOptions,
title: Optional[str],
aiOperationId: str
) -> AiResponse:
"""Handle IMAGE_GENERATE operation type."""
self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation")
request = AiCallRequest(
prompt=prompt,
context="",
options=options
)
response = await self.callAi(request)
if not response.content:
errorMsg = f"No image data returned: {response.content}"
logger.error(f"Error in AI image generation: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
imageDoc = DocumentData(
documentName="generated_image.png",
documentData=response.content,
mimeType="image/png"
)
metadata = AiResponseMetadata(
title=title or "Generated Image",
operationType=options.operationType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
"ai.generate.image"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, "Image generated")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata,
documents=[imageDoc]
)
async def _handleWebOperation(
self,
prompt: str,
options: AiCallOptions,
opType: OperationTypeEnum,
aiOperationId: str
) -> AiResponse:
"""Handle WEB_SEARCH and WEB_CRAWL operation types."""
self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
request = AiCallRequest(
prompt=prompt, # Raw JSON prompt - connector will parse it
context="",
options=options
)
response = await self.callAi(request)
if not response.content:
errorMsg = f"No content returned from {opType.name}: {response.content}"
logger.error(f"Error in {opType.name}: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
metadata = AiResponseMetadata(
operationType=opType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
f"ai.{opType.name.lower()}"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata
)
def _getIntentForDocument(
self,
docId: str,
intents: Optional[List[DocumentIntent]]
) -> Optional[DocumentIntent]:
"""Find DocumentIntent for given documentId."""
if not intents:
return None
for intent in intents:
if intent.documentId == docId:
return intent
return None
async def _clarifyDocumentIntents(
self,
documents: List[ChatDocument],
userPrompt: str,
actionParameters: Dict[str, Any],
parentOperationId: str
) -> List[DocumentIntent]:
"""Delegate to DocumentIntentAnalyzer."""
return await self.intentAnalyzer.clarifyDocumentIntents(
documents, userPrompt, actionParameters, parentOperationId
)
async def _extractAndPrepareContent(
self,
documents: List[ChatDocument],
documentIntents: List[DocumentIntent],
parentOperationId: str
) -> List[ContentPart]:
"""Delegate to ContentExtractor."""
return await self.contentExtractor.extractAndPrepareContent(
documents, documentIntents, parentOperationId, self._getIntentForDocument
)
async def _generateStructure(
self,
userPrompt: str,
contentParts: List[ContentPart],
outputFormat: str,
parentOperationId: str
) -> Dict[str, Any]:
"""Delegate to StructureGenerator."""
return await self.structureGenerator.generateStructure(
userPrompt, contentParts, outputFormat, parentOperationId
)
async def _fillStructure(
self,
structure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
parentOperationId: str
) -> Dict[str, Any]:
"""Delegate to StructureFiller."""
return await self.structureFiller.fillStructure(
structure, contentParts, userPrompt, parentOperationId
)
async def _renderResult(
self,
filledStructure: Dict[str, Any],
outputFormat: str,
title: str,
userPrompt: str,
parentOperationId: str
) -> List[RenderedDocument]:
"""
Phase 5E: Rendert gefüllte Struktur zum Ziel-Format.
Jedes Dokument wird einzeln gerendert, jeder Renderer kann 1..n Dokumente zurückgeben.
Args:
filledStructure: Gefüllte Struktur mit elements
outputFormat: Ziel-Format (pdf, docx, html, etc.) - wird für alle Dokumente verwendet
title: Dokument-Titel
userPrompt: User-Anfrage
parentOperationId: Parent Operation-ID für ChatLog-Hierarchie
Returns:
List of RenderedDocument objects.
Jedes RenderedDocument repräsentiert ein gerendertes Dokument (Hauptdokument oder unterstützende Datei)
"""
# Erstelle Operation-ID für Rendering
renderOperationId = f"{parentOperationId}_rendering"
# Starte ChatLog mit Parent-Referenz
self.services.chat.progressLogStart(
renderOperationId,
"Content Rendering",
"Rendering",
f"Rendering to {outputFormat} format",
parentOperationId=parentOperationId
)
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
# renderReport verarbeitet jetzt jedes Dokument einzeln
# und gibt Liste von (documentData, mimeType, filename) zurück
renderedDocuments = await generationService.renderReport(
filledStructure,
outputFormat,
title,
userPrompt,
self,
parentOperationId=renderOperationId # Parent-Referenz für ChatLog-Hierarchie
)
# ChatLog abschließen
self.services.chat.progressLogFinish(renderOperationId, True)
return renderedDocuments
except Exception as e:
self.services.chat.progressLogFinish(renderOperationId, False)
logger.error(f"Error in _renderResult: {str(e)}")
raise
def _shouldSkipContentPart(
self,
part: ContentPart
) -> bool:
"""Check if ContentPart should be skipped (already structured JSON)."""
if part.typeGroup == "structure" and part.mimeType == "application/json":
if part.metadata.get("skipExtraction", False):
logger.debug(f"Skipping already-structured JSON ContentPart {part.id} (skipExtraction=True)")
return True
try:
if isinstance(part.data, str):
jsonData = json.loads(part.data)
if isinstance(jsonData, dict) and ("documents" in jsonData or "sections" in jsonData):
logger.debug(f"Skipping already-structured JSON ContentPart {part.id} (contains documents/sections)")
return True
except Exception:
pass # Not JSON, continue processing
return False
async def callAiContent(
self,
prompt: str,
options: AiCallOptions,
contentParts: Optional[List[ContentPart]] = None,
documentList: Optional[Any] = None, # DocumentReferenceList
documentIntents: Optional[List[DocumentIntent]] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None,
parentOperationId: Optional[str] = None
) -> AiResponse:
"""
Einheitliche AI-Content-Verarbeitung - Single Entry Point für alle AI-Actions.
Alle AI-Actions (ai.process, ai.generateDocument, etc.) routen hier durch.
Sie unterscheiden sich nur in Parametern, nicht in Logik.
Args:
prompt: The main prompt for the AI call
options: AI call configuration options (REQUIRED - operationType must be set)
contentParts: Optional list of already-extracted content parts (preferred)
documentList: Optional DocumentReferenceList (wird zu ChatDocuments konvertiert)
documentIntents: Optional list of DocumentIntent objects (wird erstellt wenn nicht vorhanden)
outputFormat: Optional output format for document generation (e.g., 'pdf', 'docx', 'xlsx')
title: Optional title for generated documents
parentOperationId: Optional parent operation ID for hierarchical logging
Returns:
AiResponse with content, metadata, and optional documents
"""
await self.ensureAiObjectsInitialized()
# Erstelle Operation-ID
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
aiOperationId = f"ai_content_{workflowId}_{int(time.time())}"
# Starte Progress-Tracking mit Parent-Referenz
self.services.chat.progressLogStart(
aiOperationId,
"AI content processing",
"Content Processing",
f"Format: {outputFormat or 'text'}",
parentOperationId=parentOperationId
)
try:
# Initialisiere Defaults
if not outputFormat:
outputFormat = "txt"
opType = getattr(options, "operationType", None)
if not opType:
options.operationType = OperationTypeEnum.DATA_GENERATE
opType = OperationTypeEnum.DATA_GENERATE
# Route zu Operation-spezifischen Handlern
if opType == OperationTypeEnum.IMAGE_GENERATE:
return await self._handleImageGeneration(prompt, options, title, aiOperationId)
if opType == OperationTypeEnum.WEB_SEARCH or opType == OperationTypeEnum.WEB_CRAWL:
return await self._handleWebOperation(prompt, options, opType, aiOperationId)
# Dokument-Generierungs-Pfad
options.compressPrompt = False
options.compressContext = False
# Schritt 5A: Kläre Dokument-Intents
documents = []
if documentList:
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not documentIntents and documents:
documentIntents = await self._clarifyDocumentIntents(
documents,
prompt,
{"outputFormat": outputFormat},
aiOperationId
)
# Schritt 5B: Extrahiere und bereite Content vor
if documents:
preparedContentParts = await self._extractAndPrepareContent(
documents,
documentIntents or [],
aiOperationId
)
# Merge mit bereitgestellten contentParts (falls vorhanden)
if contentParts:
# Prüfe auf pre-extracted Content
for part in contentParts:
if part.metadata.get("skipExtraction", False):
# Bereits extrahiert - verwende as-is, stelle sicher dass Metadaten vollständig
part.metadata.setdefault("contentFormat", "extracted")
part.metadata.setdefault("isPreExtracted", True)
preparedContentParts.extend(contentParts)
contentParts = preparedContentParts
# Schritt 5C: Generiere Struktur
structure = await self._generateStructure(
prompt,
contentParts or [],
outputFormat,
aiOperationId
)
# Schritt 5D: Fülle Struktur
# Language will be extracted from services (user intention analysis) in fillStructure
filledStructure = await self._fillStructure(
structure,
contentParts or [],
prompt,
aiOperationId
)
# Schritt 5E: Rendere Resultat
# Jedes Dokument wird einzeln gerendert, kann 1..n Dateien zurückgeben (z.B. HTML + Bilder)
renderedDocuments = await self._renderResult(
filledStructure,
outputFormat,
title or "Generated Document",
prompt,
aiOperationId
)
# Baue Response: Konvertiere alle gerenderten Dokumente zu DocumentData
documentDataList = []
for renderedDoc in renderedDocuments:
try:
# Erstelle DocumentData für jedes gerenderte Dokument
docDataObj = DocumentData(
documentName=renderedDoc.filename,
documentData=renderedDoc.documentData,
mimeType=renderedDoc.mimeType,
sourceJson=filledStructure if len(documentDataList) == 0 else None # Nur für erstes Dokument
)
documentDataList.append(docDataObj)
logger.debug(f"Added rendered document: {renderedDoc.filename} ({len(renderedDoc.documentData)} bytes, {renderedDoc.mimeType})")
except Exception as e:
logger.warning(f"Error creating document {renderedDoc.filename}: {str(e)}")
if not documentDataList:
raise ValueError("No documents were rendered")
metadata = AiResponseMetadata(
title=title or filledStructure.get("metadata", {}).get("title", "Generated Document"),
operationType=opType.value
)
# Debug-Log (harmonisiert)
self.services.utils.writeDebugFile(
json.dumps(filledStructure, indent=2, ensure_ascii=False, default=str),
"document_generation_response"
)
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=json.dumps(filledStructure),
metadata=metadata,
documents=documentDataList
)
except Exception as e:
logger.error(f"Error in callAiContent: {str(e)}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise
def _determineDocumentName(
self,
filledStructure: Dict[str, Any],
outputFormat: str,
title: Optional[str]
) -> str:
"""Bestimme Dokument-Namen aus Struktur oder Titel."""
# Versuche aus Struktur zu extrahieren
if isinstance(filledStructure, dict) and "documents" in filledStructure:
docs = filledStructure["documents"]
if isinstance(docs, list) and len(docs) > 0:
firstDoc = docs[0]
if isinstance(firstDoc, dict) and firstDoc.get("filename"):
return firstDoc["filename"]
# Fallback zu Titel
if title:
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", title)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if sanitized:
if not sanitized.lower().endswith(f".{outputFormat}"):
return f"{sanitized}.{outputFormat}"
return sanitized
return f"generated.{outputFormat}"