# Copyright (c) 2026 PowerOn AG # All rights reserved. """Per-run NDJSON logs for persisted workflow-automation runs.""" from __future__ import annotations import asyncio import json import logging import os from datetime import datetime, timezone from typing import Any, Dict, Optional from modules.shared.configuration import APP_CONFIG from modules.shared.debugLogger import ensureDir, resolve_app_log_dir logger = logging.getLogger(__name__) RUN_FILE_LOG_RELATIVE_ROOT = "workflow_automation_runs" CONTEXT_KEY = "_waRunFileLogRelativeDir" EXECUTION_FILENAME = "node_execution.ndjson" CONTEXT_SNAPSHOT_FILENAME = "workflow_context.ndjson" def workflowAutomationRunFileLoggingEnabled() -> bool: """True when NDJSON files should be written for each persisted run.""" raw = APP_CONFIG.get("APP_WORKFLOW_AUTOMATION_RUN_FILE_LOGGING") or APP_CONFIG.get("APP_GRAPHICAL_EDITOR_RUN_FILE_LOGGING", False) if isinstance(raw, bool): return raw s = str(raw).strip().lower() return s in ("1", "true", "yes", "on") def mergeRunContextWithWaLogPrefix( baseContext: Optional[Dict[str, Any]], incoming: Dict[str, Any], ) -> Dict[str, Any]: """Copy ``CONTEXT_KEY`` from *baseContext* onto *incoming* if present (pause paths).""" out = dict(incoming or {}) prev = (baseContext or {}).get(CONTEXT_KEY) if prev is not None: out[CONTEXT_KEY] = prev return out def mergePersistedRunContext( workflowAutomationInterface: Any, runId: str, replacement: Dict[str, Any], ) -> Dict[str, Any]: """``{**db_context, **replacement}`` so *_waRunFileLogRelativeDir* and other keys survive pause updates.""" prev = dict((workflowAutomationInterface.getRun(runId) or {}).get("context") or {}) return {**prev, **(replacement or {})} class RunFileLogger: """Append-only NDJSON log for one run folder under ``resolve_app_log_dir()``.""" __slots__ = ("_exec_path", "_ctx_path", "_lock", "_run_id") def __init__(self, runId: str, absoluteRunDir: str) -> None: self._run_id = runId ensureDir(absoluteRunDir) self._exec_path = os.path.join(absoluteRunDir, EXECUTION_FILENAME) self._ctx_path = os.path.join(absoluteRunDir, CONTEXT_SNAPSHOT_FILENAME) self._lock = asyncio.Lock() @property def runId(self) -> str: return self._run_id @staticmethod def freshRunSubdirectoryName(runId: str) -> str: ts = datetime.now(timezone.utc).strftime("%Y_%m_%d_%H_%M_%S") return f"{ts}__{runId}" @staticmethod def relativeRunPath(subdirName: str) -> str: """Path relative to ``APP_LOGGING_LOG_DIR`` (POSIX-style segments).""" return "/".join((RUN_FILE_LOG_RELATIVE_ROOT, subdirName)) @classmethod def bootstrapNewRun(cls, workflowAutomationInterface: Any, runId: str, runContext: Dict[str, Any]) -> RunFileLogger | None: """Create filesystem folder + persist CONTEXT_KEY via ``updateRun``.""" if not workflowAutomationRunFileLoggingEnabled(): return None if not workflowAutomationInterface or not runId: return None subdir = cls.freshRunSubdirectoryName(runId) rel = cls.relativeRunPath(subdir) base = resolve_app_log_dir() absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir) merged = dict(runContext or {}) merged[CONTEXT_KEY] = rel try: workflowAutomationInterface.updateRun(runId, context=merged) except Exception as ex: logger.warning("WaRunFileLog: could not persist log dir on run=%s: %s", runId, ex) return None logger.info( "WaRunFileLog: created run folder %s (run=%s)", absolute, runId, ) return cls(runId, absolute) @classmethod def openFromRunRecord(cls, workflowAutomationInterface: Any, runId: str) -> RunFileLogger | None: """Open logger for an existing run using CONTEXT_KEY from DB.""" if not workflowAutomationRunFileLoggingEnabled(): return None if not workflowAutomationInterface or not runId: return None try: run = workflowAutomationInterface.getRun(runId) or {} except Exception as ex: logger.debug("WaRunFileLog: getRun failed run=%s: %s", runId, ex) return None rel = (run.get("context") or {}).get(CONTEXT_KEY) if not rel or not isinstance(rel, str): return None base_norm = os.path.realpath(resolve_app_log_dir()) allowed_root = os.path.realpath(os.path.join(base_norm, RUN_FILE_LOG_RELATIVE_ROOT)) cand = os.path.realpath(os.path.join(base_norm, *rel.replace("\\", "/").split("/"))) if cand != allowed_root and not cand.startswith(allowed_root + os.sep): logger.warning( "WaRunFileLog: path outside log root denied for run=%s rel=%s", runId, rel, ) return None absolute = cand return cls(runId, absolute) @classmethod def findExistingAbsoluteDir(cls, runId: str) -> Optional[str]: """If a folder named ``*{timestamp}__{run_id}`` exists under the log root, return its absolute path.""" root = os.path.realpath(os.path.join(resolve_app_log_dir(), RUN_FILE_LOG_RELATIVE_ROOT)) if not os.path.isdir(root): return None suffix = f"__{runId}" try: names = sorted((n for n in os.listdir(root) if n.endswith(suffix)), reverse=True) except OSError: return None if not names: return None cand = os.path.realpath(os.path.join(root, names[0])) allowed_root = root if cand != allowed_root and not cand.startswith(allowed_root + os.sep): return None return cand if os.path.isdir(cand) else None @classmethod def ensureAttached(cls, workflowAutomationInterface: Any, runId: str) -> RunFileLogger | None: """Open logger from DB, or reattach an on-disk folder for *runId*, or create a new one.""" opened = cls.openFromRunRecord(workflowAutomationInterface, runId) if opened is not None: return opened if not workflowAutomationRunFileLoggingEnabled(): return None if not workflowAutomationInterface or not runId: return None try: run = workflowAutomationInterface.getRun(runId) or {} except Exception as ex: logger.debug("WaRunFileLog: ensure getRun failed run=%s: %s", runId, ex) return None prev_ctx = dict(run.get("context") or {}) existing_abs = cls.findExistingAbsoluteDir(runId) if existing_abs: base_norm = os.path.realpath(resolve_app_log_dir()) rel = os.path.relpath(existing_abs, base_norm).replace(os.sep, "/") merged = {**prev_ctx, CONTEXT_KEY: rel} try: workflowAutomationInterface.updateRun(runId, context=merged) except Exception as ex: logger.warning("WaRunFileLog: reattach persist failed run=%s: %s", runId, ex) return None logger.info("WaRunFileLog: reattached existing folder for run=%s -> %s", runId, existing_abs) return cls(runId, existing_abs) subdir = cls.freshRunSubdirectoryName(runId) rel = cls.relativeRunPath(subdir) base = resolve_app_log_dir() absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir) merged = {**prev_ctx, CONTEXT_KEY: rel} try: workflowAutomationInterface.updateRun(runId, context=merged) except Exception as ex: logger.warning("WaRunFileLog: ensure new folder persist failed run=%s: %s", runId, ex) return None logger.info("WaRunFileLog: created late attach folder %s (run=%s)", absolute, runId) return cls(runId, absolute) async def appendNodeExecutionLine(self, record: Dict[str, Any]) -> None: line = json.dumps(record, ensure_ascii=False, default=str) async with self._lock: try: with open(self._exec_path, "a", encoding="utf-8") as f: f.write(line + "\n") except Exception as ex: logger.warning("WaRunFileLog: append execution failed run=%s: %s", self._run_id, ex) async def appendContextSnapshotLine(self, record: Dict[str, Any]) -> None: line = json.dumps(record, ensure_ascii=False, default=str) async with self._lock: try: with open(self._ctx_path, "a", encoding="utf-8") as f: f.write(line + "\n") except Exception as ex: logger.warning("WaRunFileLog: append context snapshot failed run=%s: %s", self._run_id, ex)