gateway/modules/shared/progressLogger.py

146 lines
5.7 KiB
Python

"""
Progress Logger utility for standardized progress reporting in workflows.
Provides centralized progress management with start/update/complete methods.
"""
import time
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class ProgressLogger:
"""Centralized progress logger for workflow operations."""
def __init__(self, services):
"""Initialize progress logger.
Args:
services: Services object for accessing chat service and workflow
"""
self.services = services
self.activeOperations = {}
self.finishedOperations = set() # Track finished operations to avoid repeated warnings
def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = ""):
"""Start a new long-running operation.
Args:
operationId: Unique identifier for the operation
serviceName: Name of the service (e.g., "Extract", "AI", "Generate")
actionName: Name of the action being performed
context: Additional context information
"""
# Remove from finished operations if it was there (for restart scenarios)
self.finishedOperations.discard(operationId)
self.activeOperations[operationId] = {
'service': serviceName,
'action': actionName,
'context': context,
'startTime': time.time()
}
self._logProgress(operationId, 0.0, f"Starting {actionName}")
logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}")
def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""):
"""Update progress for an operation.
Args:
operationId: Unique identifier for the operation
progress: Progress value between 0.0 and 1.0
statusUpdate: Additional status information
"""
if operationId not in self.activeOperations:
# Only warn once per operation ID if it was already finished
if operationId in self.finishedOperations:
# Operation already finished - silently ignore subsequent updates
return
else:
# Operation never started - log warning once and mark as problematic
logger.warning(f"Operation {operationId} not found for progress update (operation never started)")
self.finishedOperations.add(operationId) # Prevent repeated warnings
return
op = self.activeOperations[operationId]
context = f"{op['context']} {statusUpdate}".strip()
self._logProgress(operationId, progress, context)
logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}")
def finishOperation(self, operationId: str, success: bool = True):
"""Complete an operation.
Args:
operationId: Unique identifier for the operation
success: Whether the operation completed successfully
"""
if operationId not in self.activeOperations:
# Only warn once if operation was already finished
if operationId not in self.finishedOperations:
logger.warning(f"Operation {operationId} not found for completion (operation never started or already finished)")
self.finishedOperations.add(operationId)
return
op = self.activeOperations[operationId]
finalProgress = 1.0 if success else 0.0
status = "Done" if success else "Failed"
# Create completion log BEFORE removing from activeOperations
self._logProgress(operationId, finalProgress, status)
# Log completion time
duration = time.time() - op['startTime']
logger.info(f"Completed operation {operationId}: {op['service']} - {op['action']} in {duration:.2f}s")
# Remove from active operations AFTER creating the log
del self.activeOperations[operationId]
# Mark as finished to prevent repeated warnings from updateOperation calls
self.finishedOperations.add(operationId)
def _logProgress(self, operationId: str, progress: float, status: str):
"""Create standardized ChatLog entry.
Args:
operationId: Unique identifier for the operation
progress: Progress value between 0.0 and 1.0
status: Status information for the log entry
"""
if operationId not in self.activeOperations:
return
op = self.activeOperations[operationId]
message = f"Service {op['service']}"
workflow = self.services.workflow
if not workflow:
logger.warning(f"Cannot log progress: no workflow available")
return
logData = {
"workflowId": workflow.id,
"message": message,
"type": "info",
"status": status,
"progress": progress
}
try:
self.services.chat.storeLog(workflow, logData)
except Exception as e:
logger.error(f"Failed to store progress log: {e}")
def getActiveOperations(self) -> Dict[str, Dict[str, Any]]:
"""Get all currently active operations.
Returns:
Dictionary of active operations
"""
return self.activeOperations.copy()
def clearAllOperations(self):
"""Clear all active operations (for cleanup)."""
self.activeOperations.clear()
self.finishedOperations.clear()
logger.debug("Cleared all active operations")