181 lines
7.9 KiB
Python
181 lines
7.9 KiB
Python
"""
|
|
Progress Logger utility for standardized progress reporting in workflows.
|
|
Provides centralized progress management with start/update/complete methods.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProgressLogger:
|
|
"""Centralized progress logger for workflow operations with hierarchical support."""
|
|
|
|
def __init__(self, services):
|
|
"""Initialize progress logger.
|
|
|
|
Args:
|
|
services: Services object for accessing chat service and workflow
|
|
"""
|
|
self.services = services
|
|
self.activeOperations = {}
|
|
self.finishedOperations = set() # Track finished operations to avoid repeated warnings
|
|
self.operationLogIds = {} # Map operationId to the log entry ID for parent reference
|
|
|
|
def startOperation(self, operationId: str, serviceName: str, actionName: str, context: str = "", parentOperationId: Optional[str] = None):
|
|
"""Start a new long-running operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
serviceName: Name of the service (e.g., "Extract", "AI", "Generate")
|
|
actionName: Name of the action being performed
|
|
context: Additional context information
|
|
parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display
|
|
The parentId in ChatLog will be set to this parentOperationId
|
|
"""
|
|
# Remove from finished operations if it was there (for restart scenarios)
|
|
self.finishedOperations.discard(operationId)
|
|
|
|
self.activeOperations[operationId] = {
|
|
'service': serviceName,
|
|
'action': actionName,
|
|
'context': context,
|
|
'startTime': time.time(),
|
|
'parentOperationId': parentOperationId # Store parent's operationId, not log entry ID
|
|
}
|
|
# Use parentOperationId as parentId in ChatLog (parentId should be the operationId of parent)
|
|
logId = self._logProgress(operationId, 0.0, f"Starting {actionName}", parentOperationId=parentOperationId)
|
|
if logId:
|
|
self.operationLogIds[operationId] = logId
|
|
logger.debug(f"Started operation {operationId}: {serviceName} - {actionName}")
|
|
|
|
def updateOperation(self, operationId: str, progress: float, statusUpdate: str = ""):
|
|
"""Update progress for an operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
progress: Progress value between 0.0 and 1.0
|
|
statusUpdate: Additional status information
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
# Only warn once per operation ID if it was already finished
|
|
if operationId in self.finishedOperations:
|
|
# Operation already finished - silently ignore subsequent updates
|
|
return
|
|
else:
|
|
# Operation never started - log warning once and mark as problematic
|
|
logger.warning(f"Operation {operationId} not found for progress update (operation never started)")
|
|
self.finishedOperations.add(operationId) # Prevent repeated warnings
|
|
return
|
|
|
|
op = self.activeOperations[operationId]
|
|
context = f"{op['context']} {statusUpdate}".strip()
|
|
# Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent
|
|
parentOperationId = op.get('parentOperationId')
|
|
self._logProgress(operationId, progress, context, parentOperationId=parentOperationId)
|
|
logger.debug(f"Updated operation {operationId}: {progress:.2f} - {context}")
|
|
|
|
def finishOperation(self, operationId: str, success: bool = True):
|
|
"""Complete an operation.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
success: Whether the operation completed successfully
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
# Only warn once if operation was already finished
|
|
if operationId not in self.finishedOperations:
|
|
logger.warning(f"Operation {operationId} not found for completion (operation never started or already finished)")
|
|
self.finishedOperations.add(operationId)
|
|
return
|
|
|
|
op = self.activeOperations[operationId]
|
|
finalProgress = 1.0 if success else 0.0
|
|
status = "Done" if success else "Failed"
|
|
|
|
# Use the same parentOperationId as the start operation - all logs (start/update/finish) share the same parent
|
|
parentOperationId = op.get('parentOperationId')
|
|
|
|
# Create completion log BEFORE removing from activeOperations
|
|
self._logProgress(operationId, finalProgress, status, parentOperationId=parentOperationId)
|
|
|
|
# Log completion time
|
|
duration = time.time() - op['startTime']
|
|
logger.info(f"Completed operation {operationId}: {op['service']} - {op['action']} in {duration:.2f}s")
|
|
|
|
# Remove from active operations AFTER creating the log
|
|
del self.activeOperations[operationId]
|
|
if operationId in self.operationLogIds:
|
|
del self.operationLogIds[operationId]
|
|
|
|
# Mark as finished to prevent repeated warnings from updateOperation calls
|
|
self.finishedOperations.add(operationId)
|
|
|
|
def _logProgress(self, operationId: str, progress: float, status: str, parentOperationId: Optional[str] = None) -> Optional[str]:
|
|
"""Create standardized ChatLog entry.
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
progress: Progress value between 0.0 and 1.0
|
|
status: Status information for the log entry
|
|
parentOperationId: Optional parent operation ID (operationId of parent operation) for hierarchical display
|
|
This will be set as parentId in ChatLog (parentId = operationId of parent)
|
|
|
|
Returns:
|
|
The created log entry ID, or None if creation failed
|
|
"""
|
|
if operationId not in self.activeOperations:
|
|
return None
|
|
|
|
op = self.activeOperations[operationId]
|
|
message = f"Service {op['service']}"
|
|
|
|
workflow = self.services.workflow
|
|
if not workflow:
|
|
logger.warning(f"Cannot log progress: no workflow available")
|
|
return None
|
|
|
|
# parentId in ChatLog should be the operationId of the parent operation, not the log entry ID
|
|
logData = {
|
|
"workflowId": workflow.id,
|
|
"message": message,
|
|
"type": "info",
|
|
"status": status,
|
|
"progress": progress,
|
|
"operationId": operationId,
|
|
"parentId": parentOperationId # Set to parent's operationId, not log entry ID
|
|
}
|
|
|
|
try:
|
|
chatLog = self.services.chat.storeLog(workflow, logData)
|
|
return chatLog.id if chatLog else None
|
|
except Exception as e:
|
|
logger.error(f"Failed to store progress log: {e}")
|
|
return None
|
|
|
|
def getOperationLogId(self, operationId: str) -> Optional[str]:
|
|
"""Get the log entry ID for an operation (the start log entry).
|
|
|
|
Args:
|
|
operationId: Unique identifier for the operation
|
|
|
|
Returns:
|
|
The log entry ID for the operation start, or None if not found
|
|
"""
|
|
return self.operationLogIds.get(operationId)
|
|
|
|
def getActiveOperations(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Get all currently active operations.
|
|
|
|
Returns:
|
|
Dictionary of active operations
|
|
"""
|
|
return self.activeOperations.copy()
|
|
|
|
def clearAllOperations(self):
|
|
"""Clear all active operations (for cleanup)."""
|
|
self.activeOperations.clear()
|
|
self.finishedOperations.clear()
|
|
logger.debug("Cleared all active operations")
|