gateway/modules/shared/progressLogger.py
2025-12-30 11:11:31 +01:00

187 lines
8.4 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Progress Logger utility for standardized progress reporting in workflows.
Provides centralized progress management with start/update/complete methods.
"""
import time
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class ProgressLogger:
"""Centralized progress logger for workflow operations with hierarchical support."""
def __init__(self, services):
"""Initialize progress logger.
Args:
services: Services object for accessing chat service and workflow
"""
self.services = services
self.activeOperations = {}
self.finishedOperations = set() # Track finished operations to avoid repeated warnings
self.operationLogIds = {} # Map operationId to the log entry ID for parent reference
def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentOperationId: Optional[str] = None):
"""Start a new long-running operation.
Args:
operationId: Unique identifier for the operation
serviceName: Name of the service (e.g., "Extract", "AI", "Generate")
actionName: Name of the action being performed
context: Additional context information
parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display
The parentId in ChatLog will be set to this parentOperationId
"""
# Remove from finished operations if it was there (for restart scenarios)
self.finishedOperations.discard(operationId)
self.activeOperations[operationId] = {
'service': serviceName,
'action': actionName,
'context': context,
'startTime': time.time(),
'parentOperationId': parentOperationId # Store parent's operationId, not log entry ID
}
# Use parentOperationId as parentId in ChatLog (parentId should be the operationId of parent)
logId = self._logProgress(operationId, 0.0, f"Starting {actionName}", parentOperationId=parentOperationId)
if logId:
self.operationLogIds[operationId] = logId
logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}")
def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""):
"""Update progress for an operation.
Args:
operationId: Unique identifier for the operation
progress: Progress value between 0.0 and 1.0
statusUpdate: Additional status information
"""
if operationId not in self.activeOperations:
# Only warn once per operation ID if it was already finished
if operationId in self.finishedOperations:
# Operation already finished - silently ignore subsequent updates
return
else:
# Operation never started - log warning once and mark as problematic
logger.warning(f"Operation {operationId} not found for progress update (operation never started)")
self.finishedOperations.add(operationId) # Prevent repeated warnings
return
op = self.activeOperations[operationId]
context = f"{op['context']} {statusUpdate}".strip()
# Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent
parentOperationId = op.get('parentOperationId')
self._logProgress(operationId, progress, context, parentOperationId=parentOperationId)
logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}")
def finishOperation(self, operationId: str, success: bool = True):
"""Complete an operation.
Args:
operationId: Unique identifier for the operation
success: Whether the operation completed successfully
"""
if operationId not in self.activeOperations:
# Only warn once if operation was already finished
if operationId not in self.finishedOperations:
logger.warning(f"Operation {operationId} not found for completion (operation never started or already finished)")
self.finishedOperations.add(operationId)
return
op = self.activeOperations[operationId]
finalProgress = 1.0 if success else 0.0
status = "Done" if success else "Failed"
# Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent
parentOperationId = op.get('parentOperationId')
# Create completion log BEFORE removing from activeOperations
self._logProgress(operationId, finalProgress, status, parentOperationId=parentOperationId)
# Log completion time
duration = time.time() - op['startTime']
logger.info(f"Completed operation {operationId}: {op['service']} - {op['action']} in {duration:.2f}s")
# Remove from active operations AFTER creating the log
del self.activeOperations[operationId]
if operationId in self.operationLogIds:
del self.operationLogIds[operationId]
# Mark as finished to prevent repeated warnings from updateOperation calls
self.finishedOperations.add(operationId)
def _logProgress(self, operationId: str, progress: float, status: str, parentOperationId: Optional[str] = None) -> Optional[str]:
"""Create standardized ChatLog entry.
Args:
operationId: Unique identifier for the operation
progress: Progress value between 0.0 and 1.0
status: Status information for the log entry
parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display
This will be set as parentId in ChatLog (parentId = operationId of parent)
Returns:
The created log entry ID, or None if creation failed
"""
if operationId not in self.activeOperations:
return None
op = self.activeOperations[operationId]
message = f"{op['service']}"
workflow = self.services.workflow
if not workflow:
logger.warning(f"Cannot log progress: no workflow available")
return None
# Validate parentOperationId exists in activeOperations (for debugging)
if parentOperationId and parentOperationId not in self.activeOperations:
logger.debug(f"WARNING: Parent operation '{parentOperationId}' not found in activeOperations when creating log for '{operationId}'. Available operations: {list(self.activeOperations.keys())}. Child operation may appear at root level.")
# parentId in ChatLog should be the operationId of the parent operation, not the log entry ID
logData = {
"workflowId": workflow.id,
"message": message,
"type": "info",
"status": status,
"progress": progress,
"operationId": operationId,
"parentId": parentOperationId # Set to parent's operationId, not log entry ID
}
try:
chatLog = self.services.chat.storeLog(workflow, logData)
return chatLog.id if chatLog else None
except Exception as e:
logger.error(f"Failed to store progress log: {e}")
return None
def getOperationLogId(self, operationId: str) -> Optional[str]:
"""Get the log entry ID for an operation (the start log entry).
Args:
operationId: Unique identifier for the operation
Returns:
The log entry ID for the operation start, or None if not found
"""
return self.operationLogIds.get(operationId)
def getActiveOperations(self) -> Dict[str, Dict[str, Any]]:
"""Get all currently active operations.
Returns:
Dictionary of active operations
"""
return self.activeOperations.copy()
def clearAllOperations(self):
"""Clear all active operations (for cleanup)."""
self.activeOperations.clear()
self.finishedOperations.clear()
logger.debug("Cleared all active operations")