125 lines
4.6 KiB
Python
125 lines
4.6 KiB
Python
"""
|
|
Progress Logger utility for standardized progress reporting in workflows.
|
|
Provides centralized progress management with start/update/complete methods.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProgressLogger:
|
|
"""Centralized progress logger for workflow operations."""
|
|
|
|
def __init__(self, workflowService, workflow):
|
|
"""Initialize progress logger.
|
|
|
|
Args:
|
|
workflowService: WorkflowService instance for logging
|
|
workflow: Workflow object to get workflowId from
|
|
"""
|
|
self.workflowService = workflowService
|
|
self.workflow = workflow
|
|
self.activeOperations = {}
|
|
|
|
def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = ""):
|
|
"""Start a new long-running operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
serviceName: Name of the service (e.g., "Extract", "AI", "Generate")
|
|
actionName: Name of the action being performed
|
|
context: Additional context information
|
|
"""
|
|
self.activeOperations[operationId] = {
|
|
'service': serviceName,
|
|
'action': actionName,
|
|
'context': context,
|
|
'startTime': time.time()
|
|
}
|
|
self._logProgress(operationId, 0.0, f"Starting {actionName}")
|
|
logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}")
|
|
|
|
def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""):
|
|
"""Update progress for an operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
progress: Progress value between 0.0 and 1.0
|
|
statusUpdate: Additional status information
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
logger.warning(f"Operation {operationId} not found for progress update")
|
|
return
|
|
|
|
op = self.activeOperations[operationId]
|
|
context = f"{op['context']} {statusUpdate}".strip()
|
|
self._logProgress(operationId, progress, context)
|
|
logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}")
|
|
|
|
def finishOperation(self, operationId: str, success: bool = True):
|
|
"""Complete an operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
success: Whether the operation completed successfully
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
logger.warning(f"Operation {operationId} not found for completion")
|
|
return
|
|
|
|
op = self.activeOperations[operationId]
|
|
finalProgress = 1.0 if success else 0.0
|
|
status = "Done" if success else "Failed"
|
|
|
|
# Create completion log BEFORE removing from activeOperations
|
|
self._logProgress(operationId, finalProgress, status)
|
|
|
|
# Log completion time
|
|
duration = time.time() - op['startTime']
|
|
logger.info(f"Completed operation {operationId}: {op['service']} - {op['action']} in {duration:.2f}s")
|
|
|
|
# Remove from active operations AFTER creating the log
|
|
del self.activeOperations[operationId]
|
|
|
|
def _logProgress(self, operationId: str, progress: float, status: str):
|
|
"""Create standardized ChatLog entry.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
progress: Progress value between 0.0 and 1.0
|
|
status: Status information for the log entry
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
return
|
|
|
|
op = self.activeOperations[operationId]
|
|
message = f"Service {op['service']}"
|
|
|
|
logData = {
|
|
"workflowId": self.workflow.id,
|
|
"message": message,
|
|
"type": "info",
|
|
"status": status,
|
|
"progress": progress
|
|
}
|
|
|
|
try:
|
|
self.workflowService.storeLog(self.workflow, logData)
|
|
except Exception as e:
|
|
logger.error(f"Failed to store progress log: {e}")
|
|
|
|
def getActiveOperations(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get all currently active operations.
|
|
|
|
Returns:
|
|
Dictionary of active operations
|
|
"""
|
|
return self.activeOperations.copy()
|
|
|
|
def clearAllOperations(self):
|
|
"""Clear all active operations (for cleanup)."""
|
|
self.activeOperations.clear()
|
|
logger.debug("Cleared all active operations")
|