From 41b2113bd58d77cede3b0ff916e4f762bb4d2c54 Mon Sep 17 00:00:00 2001 From: Ida Date: Thu, 14 May 2026 13:06:07 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20extract=20content=20node=20angepasst=20?= =?UTF-8?q?f=C3=BCr=20mehr=20optionen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nodeDefinitions/context.py | 211 +++++++ modules/shared/debugLogger.py | 6 + .../workflows/automation2/executionEngine.py | 481 +++++++++++++++- .../automation2/executors/inputExecutor.py | 17 +- .../graphicalEditorRunFileLogger.py | 215 ++++++++ .../methodContext/actions/extractContent.py | 514 +++++++++++++++++- .../methodContext/actions/setContext.py | 17 +- .../workflow/test_extract_content_handover.py | 116 ++++ .../unit/workflow/test_phase3_context_node.py | 20 +- 9 files changed, 1564 insertions(+), 33 deletions(-) create mode 100644 modules/workflows/automation2/graphicalEditorRunFileLogger.py diff --git a/modules/features/graphicalEditor/nodeDefinitions/context.py b/modules/features/graphicalEditor/nodeDefinitions/context.py index 52ff3a8b..26c5b788 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/context.py +++ b/modules/features/graphicalEditor/nodeDefinitions/context.py @@ -59,6 +59,207 @@ CONTEXT_NODES = [ "auch kuenftige Nicht-Bild-Typen bleiben erhalten)." ), }, + { + "name": "outputMode", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "blob", "label": t("Ausgabe: ein Textblock (blob)")}, + {"value": "lines", "label": t("Ausgabe: Zeilen / Segmente")}, + {"value": "pages", "label": t("Ausgabe: nach Seite (z. B. PDF)")}, + {"value": "chunks", "label": t("Ausgabe: Chunks (fixe Groesse)")}, + {"value": "structured", "label": t("Ausgabe: Parts als Liste")}, + ] + }, + "default": "lines", + "description": t( + "Wie die extrahierten Inhalte unter ``presentation`` strukturiert werden " + "(zusaetzlich zu den unveraenderten ``parts`` im Handover)." + ), + }, + { + "name": "splitBy", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "newline", "label": t("Trennen: Zeilenumbruch")}, + {"value": "paragraph", "label": t("Trennen: Absatz (Leerzeilen)")}, + {"value": "sentence", "label": t("Trennen: Saetze (heuristisch)")}, + ] + }, + "default": "newline", + "description": t( + "Gueltig fuer ``outputMode`` lines und chunks: welches Trennzeichen der " + "zusammenhaengende Klartext zuerst erhaelt." + ), + }, + { + "name": "chunkSizeUnit", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "dependsOn": "outputMode", + "showWhen": ["chunks"], + "options": [ + {"value": "tokens", "label": t("Chunk-Groesse: Tokens (approx. ~4 Zeichen)")}, + {"value": "characters", "label": t("Chunk-Groesse: Zeichen")}, + {"value": "words", "label": t("Chunk-Groesse: Woerter")}, + ] + }, + "default": "tokens", + "description": t("Einheit fuer ``chunkSize`` / ``chunkOverlap`` wenn outputMode chunks."), + }, + { + "name": "chunkSize", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "dependsOn": "outputMode", + "showWhen": ["chunks"], + "options": [ + {"value": "256", "label": "256"}, + {"value": "500", "label": "500"}, + {"value": "1000", "label": "1000"}, + {"value": "2000", "label": "2000"}, + {"value": "4000", "label": "4000"}, + ] + }, + "default": "500", + "description": t("Zielgroesse pro Chunk (siehe chunkSizeUnit). Nur bei outputMode chunks."), + }, + { + "name": "chunkOverlap", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "dependsOn": "outputMode", + "showWhen": ["chunks"], + "options": [ + {"value": "0", "label": "0"}, + {"value": "25", "label": "25"}, + {"value": "50", "label": "50"}, + {"value": "100", "label": "100"}, + {"value": "200", "label": "200"}, + ] + }, + "default": "0", + "description": t("Ueberlappung zwischen aufeinanderfolgenden Chunks (gleiche Einheit wie chunkSize)."), + }, + { + "name": "filterEmptyLines", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Ja")}, + {"value": "false", "label": t("Nein")}, + ] + }, + "default": "true", + "description": t("Leere bzw. nur-Whitespace-Segmente bei lines/chunks entfernen."), + }, + { + "name": "trimWhitespace", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Ja")}, + {"value": "false", "label": t("Nein")}, + ] + }, + "default": "true", + "description": t("Fuehrende und nachfolgende Leerzeichen pro Segment trimmen."), + }, + { + "name": "includeLineNumbers", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Ja")}, + {"value": "false", "label": t("Nein")}, + ] + }, + "default": "false", + "description": t("Bei lines: jedem Eintrag eine Zeilennummer (1-based) zuweisen."), + }, + { + "name": "includeMetadata", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Ja")}, + {"value": "false", "label": t("Nein")}, + ] + }, + "default": "false", + "description": t("Dateiname und einfache Offsets bei lines/chunks/pages an Eintraege haengen."), + }, + { + "name": "csvHeaderRow", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Ja")}, + {"value": "false", "label": t("Nein")}, + ] + }, + "default": "true", + "description": t( + "Bei CSV-Dateien: erste Zeile als Spaltenkoepfe interpretieren " + "und ``csvRows`` als Liste von Objekten in ``presentation`` schreiben." + ), + }, + { + "name": "pdfExtractMode", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "text", "label": t("PDF/Parts: Text & Tabellen (keine Bild-Parts)")}, + {"value": "tables", "label": t("PDF/Parts: nur Tabellen-Parts")}, + {"value": "images", "label": t("PDF/Parts: nur Bild-Parts")}, + {"value": "all", "label": t("PDF/Parts: alle Typgruppen")}, + ] + }, + "default": "text", + "description": t( + "Filtert fuer die Presentation-Schicht nach typeGroup/MIME " + "(gilt fuer alle Dokumenttypen analog, nicht nur PDF)." + ), + }, + { + "name": "markdownPreserveFormatting", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "true", "label": t("Markdown beibehalten")}, + {"value": "false", "label": t("zu vereinfachtem Klartext reduzieren")}, + ] + }, + "default": "false", + "description": t( + "Bei text/markdown-Parts: leichte Entfernung von Markup-Zeichen wenn false." + ), + }, ], "inputs": 1, "outputs": 1, @@ -79,6 +280,16 @@ CONTEXT_NODES = [ "recommended": True, "type": "Any", }, + { + "path": ["documents", 0, "documentData", "presentation"], + "pickerLabel": t("Presentation (strukturierte Sicht)"), + "detail": t( + "Nur die konfigurierte Ausgabe-Struktur (blob/lines/pages/chunks/structured); " + "unveraenderte Roh-Parts bleiben im umschliessenden Handover." + ), + "recommended": False, + "type": "Any", + }, { "path": ["response"], "pickerLabel": t("Nur Text"), diff --git a/modules/shared/debugLogger.py b/modules/shared/debugLogger.py index d1b22abc..9062ed53 100644 --- a/modules/shared/debugLogger.py +++ b/modules/shared/debugLogger.py @@ -19,6 +19,12 @@ def _resolveLogDir() -> str: logDir = os.path.join(gatewayDir, logDir) return logDir + +def resolve_app_log_dir() -> str: + """Absolute filesystem path for ``APP_LOGGING_LOG_DIR``.""" + return _resolveLogDir() + + def ensureDir(path: str) -> None: """Create directory if it does not exist.""" os.makedirs(path, exist_ok=True) diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 9df8cf9b..5f6a8592 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -32,6 +32,11 @@ from modules.features.graphicalEditor.portTypes import normalizeToSchema, wrapTr from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError +from modules.workflows.automation2.graphicalEditorRunFileLogger import ( + GraphicalEditorRunFileLogger, + graphical_editor_run_file_logging_enabled, + merge_run_context_with_ge_log_prefix, +) from modules.workflows.automation2.runEnvelope import normalize_run_envelope logger = logging.getLogger(__name__) @@ -291,6 +296,78 @@ def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: logger.debug("Could not update AutoStepLog %s: %s", stepId, e) +def _ge_iso_timestamp() -> str: + """UTC timestamp for NDJSON logs (readable, milliseconds).""" + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + +async def _ge_log_node_finished( + file_logger: Optional[GraphicalEditorRunFileLogger], + *, + run_id: Optional[str], + node_outputs: Dict[str, Any], + run_envelope: Optional[Dict[str, Any]], + node_id: str, + node_type: str, + status: str, + input_snap: Optional[Dict[str, Any]], + output: Any = None, + error: Optional[str] = None, + duration_ms: Optional[int] = None, + retry_count: Optional[int] = None, + skip_reason: Optional[str] = None, + loop_index: Optional[int] = None, + loop_node_id: Optional[str] = None, + loop_item: Optional[Any] = None, +) -> None: + """Append one execution line + one workflow-context snapshot (NDJSON).""" + if file_logger is None or not run_id: + return + ts = _ge_iso_timestamp() + exec_rec: Dict[str, Any] = { + "timestamp": ts, + "runId": run_id, + "nodeId": node_id, + "nodeType": node_type, + "status": status, + "input": _stripBinaryValues(dict(input_snap or {})), + } + if skip_reason: + exec_rec["skipReason"] = skip_reason + if duration_ms is not None: + exec_rec["durationMs"] = duration_ms + if retry_count is not None: + exec_rec["retryCount"] = retry_count + if loop_index is not None: + exec_rec["loopIndex"] = loop_index + if loop_node_id is not None: + exec_rec["loopNodeId"] = loop_node_id + if loop_item is not None: + exec_rec["loopItem"] = _stripBinaryValues(loop_item) + if error is not None: + exec_rec["error"] = error + if output is not None: + exec_rec["output"] = ( + _stripBinaryValues(output) if isinstance(output, dict) else {"value": _stripBinaryValues(output)} + ) + await file_logger.append_node_execution_line(exec_rec) + + ctx_rec: Dict[str, Any] = { + "timestamp": ts, + "runId": run_id, + "afterNodeId": node_id, + "afterNodeType": node_type, + "afterStatus": status, + "nodeOutputsSnapshot": _serializableOutputs(node_outputs), + "runEnvelope": _stripBinaryValues(dict(run_envelope or {})), + } + if loop_index is not None: + ctx_rec["loopIndex"] = loop_index + if loop_node_id is not None: + ctx_rec["loopNodeId"] = loop_node_id + await file_logger.append_context_snapshot_line(ctx_rec) + + async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryDelaySeconds: float = 1.0): """Execute a node with optional retry policy from node parameters.""" params = node.get("parameters") or {} @@ -356,6 +433,7 @@ async def _run_post_loop_done_nodes( automation2_interface: Optional[Any], runId: Optional[str], processed_in_loop: Set[str], + ge_file_logger: Optional[GraphicalEditorRunFileLogger] = None, ) -> Optional[Dict[str, Any]]: """After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once.""" _prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids) @@ -395,6 +473,17 @@ async def _run_post_loop_done_nodes( _skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap) if _skId: _updateStepLog(automation2_interface, _skId, "skipped") + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="skipped", + input_snap=_skipSnap, + skip_reason=str(_skipSnap.get("_skipReason") or "inactive_branch"), + ) continue _dexec = _getExecutor(_dn.get("type", ""), services, automation2_interface) if not _dexec: @@ -415,21 +504,82 @@ async def _run_post_loop_done_nodes( _updateStepLog(automation2_interface, _dStepId, "completed", output=_dres if isinstance(_dres, dict) else {"value": _dres}, durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="completed", + input_snap=_dIn, + output=_dres, + duration_ms=_dDur, + retry_count=_dRetry, + ) except PauseForHumanTaskError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="completed", + input_snap=_dIn, + duration_ms=int((time.time() - _dStart) * 1000), + ) raise except PauseForEmailWaitError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="completed", + input_snap=_dIn, + duration_ms=int((time.time() - _dStart) * 1000), + ) raise except (_SubscriptionInactiveException, _BillingContextError): + _dFailDur = int((time.time() - _dStart) * 1000) _updateStepLog(automation2_interface, _dStepId, "failed", - error="Subscription/Billing error", durationMs=int((time.time() - _dStart) * 1000)) + error="Subscription/Billing error", durationMs=_dFailDur) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="failed", + input_snap=_dIn, + error="Subscription/Billing error", + duration_ms=_dFailDur, + ) raise except Exception as _dex: + _dFailDur2 = int((time.time() - _dStart) * 1000) _updateStepLog(automation2_interface, _dStepId, "failed", - error=str(_dex), durationMs=int((time.time() - _dStart) * 1000)) + error=str(_dex), durationMs=_dFailDur2) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=_dnid, + node_type=_dn.get("type", ""), + status="failed", + input_snap=_dIn, + error=str(_dex), + duration_ms=_dFailDur2, + ) raise processed_in_loop.update(_done_only) return None @@ -523,6 +673,7 @@ async def executeGraph( except Exception as valErr: logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr) + ge_file_logger: Optional[GraphicalEditorRunFileLogger] = None nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) if not runId and automation2_interface and workflowId and not is_resume: run_context = { @@ -560,6 +711,12 @@ async def executeGraph( ) runId = run.get("id") if run else None logger.info("executeGraph created run %s label=%s", runId, run_label) + if runId and graphical_editor_run_file_logging_enabled(): + ge_file_logger = GraphicalEditorRunFileLogger.bootstrap_new_run( + automation2_interface, + runId, + run_context, + ) env_for_run = normalize_run_envelope(run_envelope, user_id=userId) @@ -586,6 +743,17 @@ async def executeGraph( if runId: _activeRunContexts[runId] = context + if ( + graphical_editor_run_file_logging_enabled() + and automation2_interface + and runId + and ge_file_logger is None + ): + ge_file_logger = GraphicalEditorRunFileLogger.ensure_attached( + automation2_interface, + runId, + ) + skip_until_passed = bool(startAfterNodeId) processed_in_loop: Set[str] = set() _aggregateAccumulators: Dict[str, list] = {} @@ -648,28 +816,106 @@ async def executeGraph( _updateStepLog(automation2_interface, _rStepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_rDur, retryCount=_rRetry) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_rInputSnap, + output=result, + duration_ms=_rDur, + retry_count=_rRetry, + loop_index=next_index, + loop_node_id=loop_node_id, + loop_item=items[next_index], + ) logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry) if _resume_feedback_body_node_id and bnid == _resume_feedback_body_node_id: _resume_body_results.append(result) except PauseForHumanTaskError as e: + _rPauseDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", - durationMs=int((time.time() - _rStepStart) * 1000)) + durationMs=_rPauseDur) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_rInputSnap, + duration_ms=_rPauseDur, + loop_index=next_index, + loop_node_id=loop_node_id, + loop_item=items[next_index], + ) if automation2_interface: run_ctx = dict(run.get("context") or {}) run_ctx["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} - automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) + automation2_interface.updateRun(runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs)} except PauseForEmailWaitError as e: + _rEmailDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", - durationMs=int((time.time() - _rStepStart) * 1000)) + durationMs=_rEmailDur) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_rInputSnap, + duration_ms=_rEmailDur, + loop_index=next_index, + loop_node_id=loop_node_id, + loop_item=items[next_index], + ) raise except (_SubscriptionInactiveException, _BillingContextError): + _rFailDurSb = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "failed", - error="Subscription/Billing error", durationMs=int((time.time() - _rStepStart) * 1000)) + error="Subscription/Billing error", durationMs=_rFailDurSb) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="failed", + input_snap=_rInputSnap, + error="Subscription/Billing error", + duration_ms=_rFailDurSb, + loop_index=next_index, + loop_node_id=loop_node_id, + loop_item=items[next_index], + ) raise except Exception as ex: + _rFailDurEx = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "failed", - error=str(ex), durationMs=int((time.time() - _rStepStart) * 1000)) + error=str(ex), durationMs=_rFailDurEx) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="failed", + input_snap=_rInputSnap, + error=str(ex), + duration_ms=_rFailDurEx, + loop_index=next_index, + loop_node_id=loop_node_id, + loop_item=items[next_index], + ) logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex) nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: @@ -699,6 +945,7 @@ async def executeGraph( automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, + ge_file_logger=ge_file_logger, ) for i, node in enumerate(ordered): @@ -735,6 +982,17 @@ async def executeGraph( _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap) if _skipStepId: _updateStepLog(automation2_interface, _skipStepId, "skipped") + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="skipped", + input_snap=_skipInputSnap, + skip_reason=str(_skipInputSnap.get("_skipReason") or "inactive_branch"), + ) continue executor = _getExecutor(nodeType, services, automation2_interface) logger.info( @@ -806,10 +1064,20 @@ async def executeGraph( _activeOutputs[bnid] = None continue _bStepStart = time.time() + _bInputSnapAlways: Dict[str, Any] = {"_loopItem": _item, "_loopIndex": _idx} + for _bSnapSrc, _, _ in connectionMap.get(bnid, []): + if _bSnapSrc in _activeOutputs: + _bInputSnapAlways[_bSnapSrc] = _activeOutputs[_bSnapSrc] _bStepId = None if not _batchMode or _idx == 0 or _idx == len(items) - 1: - _bInputSnap = {"_loopItem": _item, "_loopIndex": _idx} - _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnap) + _bStepId = _createStepLog( + automation2_interface, + runId, + bnid, + body_node.get("type", ""), + "running", + _bInputSnapAlways, + ) try: bres, _bRetry = await _executeWithRetry(bexec, body_node, _activeCtx) if body_node.get("type") == "data.aggregate": @@ -822,17 +1090,48 @@ async def executeGraph( _aggregateTempChunks.setdefault(bnid, []).append(_aggregateAccumulators[bnid]) _aggregateAccumulators[bnid] = [] _activeOutputs[bnid] = bres + _bDur = int((time.time() - _bStepStart) * 1000) if _bStepId: - _bDur = int((time.time() - _bStepStart) * 1000) _updateStepLog(automation2_interface, _bStepId, "completed", output=bres if isinstance(bres, dict) else {"value": bres}, durationMs=_bDur, retryCount=_bRetry) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=_activeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_bInputSnapAlways, + output=bres, + duration_ms=_bDur, + retry_count=_bRetry, + loop_index=_idx, + loop_node_id=nodeId, + loop_item=_item, + ) if _loopConcurrency == 1: nodeOutputs[bnid] = bres except PauseForHumanTaskError as e: + _bHd = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", - durationMs=int((time.time() - _bStepStart) * 1000)) + durationMs=_bHd) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=_activeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_bInputSnapAlways, + duration_ms=_bHd, + loop_index=_idx, + loop_node_id=nodeId, + loop_item=_item, + ) if runId and automation2_interface: _run = automation2_interface.getRun(runId) or {} _run_ctx = dict(_run.get("context") or {}) @@ -840,19 +1139,66 @@ async def executeGraph( automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=_run_ctx) return {"_pause": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId} except PauseForEmailWaitError: + _bEd = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", - durationMs=int((time.time() - _bStepStart) * 1000)) + durationMs=_bEd) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=_activeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="completed", + input_snap=_bInputSnapAlways, + duration_ms=_bEd, + loop_index=_idx, + loop_node_id=nodeId, + loop_item=_item, + ) raise except (_SubscriptionInactiveException, _BillingContextError): + _bSb = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", - error="Subscription/Billing error", durationMs=int((time.time() - _bStepStart) * 1000)) + error="Subscription/Billing error", durationMs=_bSb) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=_activeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="failed", + input_snap=_bInputSnapAlways, + error="Subscription/Billing error", + duration_ms=_bSb, + loop_index=_idx, + loop_node_id=nodeId, + loop_item=_item, + ) raise except Exception as ex: + _bFail = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", - error=str(ex), durationMs=int((time.time() - _bStepStart) * 1000)) + error=str(ex), durationMs=_bFail) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=_activeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=bnid, + node_type=body_node.get("type", ""), + status="failed", + input_snap=_bInputSnapAlways, + error=str(ex), + duration_ms=_bFail, + loop_index=_idx, + loop_node_id=nodeId, + loop_item=_item, + ) logger.exception("executeGraph loop body node %s FAILED (iter %d): %s", bnid, _idx, ex) return {"_error": str(ex), "failedNode": bnid} @@ -932,11 +1278,31 @@ async def executeGraph( automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, + ge_file_logger=ge_file_logger, ) + _loopDurMs = int((time.time() - _stepStartMs) * 1000) + _loopStepOut = { + "iterationCount": len(items), + "items": len(items), + "concurrency": _loopConcurrency, + "batchMode": _batchMode, + } _updateStepLog(automation2_interface, _stepId, "completed", - output={"iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode}, - durationMs=int((time.time() - _stepStartMs) * 1000)) + output=_loopStepOut, + durationMs=_loopDurMs) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="completed", + input_snap=_loopInputSnap, + output=_loopStepOut, + duration_ms=_loopDurMs, + ) logger.info("executeGraph flow.loop done: %d iterations (concurrency=%d, batchMode=%s)", len(items), _loopConcurrency, _batchMode) elif _isBarrierNode(nodeType): if not _allMergePredecessorsReady(nodeId, connectionMap, nodeOutputs): @@ -952,6 +1318,24 @@ async def executeGraph( result, retryCount = await _executeWithRetry(executor, node, context) result = _normalizeResult(result, nodeType) nodeOutputs[nodeId] = result + _mergeDurMs = int((time.time() - _stepStartMs) * 1000) + _mergeTok = result.get("tokensUsed", 0) if isinstance(result, dict) else 0 + _updateStepLog(automation2_interface, _stepId, "completed", + output=result if isinstance(result, dict) else {"value": result}, + durationMs=_mergeDurMs, tokensUsed=_mergeTok, retryCount=retryCount) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="completed", + input_snap=_inputSnap, + output=result, + duration_ms=_mergeDurMs, + retry_count=retryCount, + ) else: _stepStartMs = time.time() _inputSnap = {} @@ -967,6 +1351,19 @@ async def executeGraph( _updateStepLog(automation2_interface, _stepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_durMs, tokensUsed=_tokens, retryCount=retryCount) + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="completed", + input_snap=_inputSnap, + output=result, + duration_ms=_durMs, + retry_count=retryCount, + ) logger.info( "executeGraph node %s done: result_type=%s result_keys=%s retries=%d duration=%dms", nodeId, @@ -976,8 +1373,23 @@ async def executeGraph( _durMs, ) except PauseForHumanTaskError as e: + _huPauseMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "completed", - durationMs=int((time.time() - _stepStartMs) * 1000)) + durationMs=_huPauseMs) + _ge_in = locals().get("_inputSnap") + if _ge_in is None: + _ge_in = locals().get("_loopInputSnap") or {} + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="completed", + input_snap=_ge_in, + duration_ms=_huPauseMs, + ) logger.info("executeGraph paused for human task %s", e.taskId) return { "success": False, @@ -988,8 +1400,23 @@ async def executeGraph( "nodeOutputs": _serializableOutputs(nodeOutputs), } except PauseForEmailWaitError as e: + _emailPauseMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "completed", - durationMs=int((time.time() - _stepStartMs) * 1000)) + durationMs=_emailPauseMs) + _ge_email_in = locals().get("_inputSnap") + if _ge_email_in is None: + _ge_email_in = locals().get("_loopInputSnap") or {} + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="completed", + input_snap=_ge_email_in, + duration_ms=_emailPauseMs, + ) logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId) try: from modules.interfaces.interfaceDbApp import getRootInterface @@ -1013,6 +1440,9 @@ async def executeGraph( "mandateId": context.get("mandateId"), "instanceId": context.get("instanceId"), } + if automation2_interface and e.runId: + prev_ctx = dict((automation2_interface.getRun(e.runId) or {}).get("context") or {}) + run_ctx = merge_run_context_with_ge_log_prefix(prev_ctx, run_ctx) automation2_interface.updateRun( e.runId, status="paused", @@ -1033,6 +1463,21 @@ async def executeGraph( nodeOutputs[nodeId] = {"error": str(e), "success": False} _durMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs) + _ge_fail_in = locals().get("_inputSnap") + if _ge_fail_in is None: + _ge_fail_in = locals().get("_loopInputSnap") or {} + await _ge_log_node_finished( + ge_file_logger, + run_id=runId, + node_outputs=nodeOutputs, + run_envelope=context.get("runEnvelope"), + node_id=nodeId, + node_type=nodeType, + status="failed", + input_snap=_ge_fail_in, + error=str(e), + duration_ms=_durMs, + ) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: diff --git a/modules/workflows/automation2/executors/inputExecutor.py b/modules/workflows/automation2/executors/inputExecutor.py index 22fa2eba..4ccef725 100644 --- a/modules/workflows/automation2/executors/inputExecutor.py +++ b/modules/workflows/automation2/executors/inputExecutor.py @@ -65,16 +65,23 @@ class InputExecutor: ) taskId = task.get("id") - self.automation2.updateRun( + from modules.workflows.automation2.graphicalEditorRunFileLogger import merge_persisted_run_context + + _pause_ctx = merge_persisted_run_context( + self.automation2, runId, - status="paused", - nodeOutputs=context.get("nodeOutputs"), - currentNodeId=nodeId, - context={ + { "connectionMap": context.get("connectionMap"), "inputSources": context.get("inputSources"), "orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")], }, ) + self.automation2.updateRun( + runId, + status="paused", + nodeOutputs=context.get("nodeOutputs"), + currentNodeId=nodeId, + context=_pause_ctx, + ) logger.info("InputExecutor node %s: created task %s, run %s paused", nodeId, taskId, runId) raise PauseForHumanTaskError(runId=runId, taskId=taskId, nodeId=nodeId) diff --git a/modules/workflows/automation2/graphicalEditorRunFileLogger.py b/modules/workflows/automation2/graphicalEditorRunFileLogger.py new file mode 100644 index 00000000..ac28ddb1 --- /dev/null +++ b/modules/workflows/automation2/graphicalEditorRunFileLogger.py @@ -0,0 +1,215 @@ +# Copyright (c) 2025 Patrick Motsch +"""Per-run NDJSON logs for persisted Automation2 / graphical-editor runs.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +from modules.shared.configuration import APP_CONFIG +from modules.shared.debugLogger import ensureDir, resolve_app_log_dir + +logger = logging.getLogger(__name__) + + +RUN_FILE_LOG_RELATIVE_ROOT = "graphical_editor_runs" +CONTEXT_KEY = "_geRunFileLogRelativeDir" +EXECUTION_FILENAME = "node_execution.ndjson" +CONTEXT_SNAPSHOT_FILENAME = "workflow_context.ndjson" + + +def graphical_editor_run_file_logging_enabled() -> bool: + """True when NDJSON files should be written for each persisted run.""" + raw = APP_CONFIG.get("APP_GRAPHICAL_EDITOR_RUN_FILE_LOGGING", False) + if isinstance(raw, bool): + return raw + s = str(raw).strip().lower() + return s in ("1", "true", "yes", "on") + + +def merge_run_context_with_ge_log_prefix( + base_context: Optional[Dict[str, Any]], + incoming: Dict[str, Any], +) -> Dict[str, Any]: + """Copy ``CONTEXT_KEY`` from *base_context* onto *incoming* if present (pause paths).""" + out = dict(incoming or {}) + prev = (base_context or {}).get(CONTEXT_KEY) + if prev is not None: + out[CONTEXT_KEY] = prev + return out + + +def merge_persisted_run_context( + automation2_interface: Any, + run_id: str, + replacement: Dict[str, Any], +) -> Dict[str, Any]: + """``{**db_context, **replacement}`` so *_geRunFileLogRelativeDir* and other keys survive pause updates.""" + prev = dict((automation2_interface.getRun(run_id) or {}).get("context") or {}) + return {**prev, **(replacement or {})} + + +class GraphicalEditorRunFileLogger: + """Append-only NDJSON log for one run folder under ``resolve_app_log_dir()``.""" + + __slots__ = ("_exec_path", "_ctx_path", "_lock", "_run_id") + + def __init__(self, run_id: str, absolute_run_dir: str) -> None: + self._run_id = run_id + ensureDir(absolute_run_dir) + self._exec_path = os.path.join(absolute_run_dir, EXECUTION_FILENAME) + self._ctx_path = os.path.join(absolute_run_dir, CONTEXT_SNAPSHOT_FILENAME) + self._lock = asyncio.Lock() + + @property + def run_id(self) -> str: + return self._run_id + + @staticmethod + def fresh_run_subdirectory_name(run_id: str) -> str: + ts = datetime.now(timezone.utc).strftime("%Y_%m_%d_%H_%M_%S") + return f"{ts}__{run_id}" + + @staticmethod + def relative_run_path(subdir_name: str) -> str: + """Path relative to ``APP_LOGGING_LOG_DIR`` (POSIX-style segments).""" + return "/".join((RUN_FILE_LOG_RELATIVE_ROOT, subdir_name)) + + @classmethod + def bootstrap_new_run(cls, automation2_interface: Any, run_id: str, run_context: Dict[str, Any]) -> GraphicalEditorRunFileLogger | None: + """Create filesystem folder + persist CONTEXT_KEY via ``updateRun``.""" + if not graphical_editor_run_file_logging_enabled(): + return None + if not automation2_interface or not run_id: + return None + subdir = cls.fresh_run_subdirectory_name(run_id) + rel = cls.relative_run_path(subdir) + base = resolve_app_log_dir() + absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir) + + merged = dict(run_context or {}) + merged[CONTEXT_KEY] = rel + try: + automation2_interface.updateRun(run_id, context=merged) + except Exception as ex: + logger.warning("GeRunFileLog: could not persist log dir on run=%s: %s", run_id, ex) + return None + + logger.info( + "GeRunFileLog: created run folder %s (run=%s)", + absolute, + run_id, + ) + return cls(run_id, absolute) + + @classmethod + def open_from_run_record(cls, automation2_interface: Any, run_id: str) -> GraphicalEditorRunFileLogger | None: + """Open logger for an existing run using CONTEXT_KEY from DB.""" + if not graphical_editor_run_file_logging_enabled(): + return None + if not automation2_interface or not run_id: + return None + try: + run = automation2_interface.getRun(run_id) or {} + except Exception as ex: + logger.debug("GeRunFileLog: getRun failed run=%s: %s", run_id, ex) + return None + rel = (run.get("context") or {}).get(CONTEXT_KEY) + if not rel or not isinstance(rel, str): + return None + base_norm = os.path.realpath(resolve_app_log_dir()) + allowed_root = os.path.realpath(os.path.join(base_norm, RUN_FILE_LOG_RELATIVE_ROOT)) + cand = os.path.realpath(os.path.join(base_norm, *rel.replace("\\", "/").split("/"))) + if cand != allowed_root and not cand.startswith(allowed_root + os.sep): + logger.warning( + "GeRunFileLog: path outside log root denied for run=%s rel=%s", + run_id, + rel, + ) + return None + absolute = cand + return cls(run_id, absolute) + + @classmethod + def find_existing_absolute_dir(cls, run_id: str) -> Optional[str]: + """If a folder named ``*{timestamp}__{run_id}`` exists under the log root, return its absolute path.""" + root = os.path.realpath(os.path.join(resolve_app_log_dir(), RUN_FILE_LOG_RELATIVE_ROOT)) + if not os.path.isdir(root): + return None + suffix = f"__{run_id}" + try: + names = sorted((n for n in os.listdir(root) if n.endswith(suffix)), reverse=True) + except OSError: + return None + if not names: + return None + cand = os.path.realpath(os.path.join(root, names[0])) + allowed_root = root + if cand != allowed_root and not cand.startswith(allowed_root + os.sep): + return None + return cand if os.path.isdir(cand) else None + + @classmethod + def ensure_attached(cls, automation2_interface: Any, run_id: str) -> GraphicalEditorRunFileLogger | None: + """Open logger from DB, or reattach an on-disk folder for *run_id*, or create a new one.""" + opened = cls.open_from_run_record(automation2_interface, run_id) + if opened is not None: + return opened + if not graphical_editor_run_file_logging_enabled(): + return None + if not automation2_interface or not run_id: + return None + try: + run = automation2_interface.getRun(run_id) or {} + except Exception as ex: + logger.debug("GeRunFileLog: ensure getRun failed run=%s: %s", run_id, ex) + return None + prev_ctx = dict(run.get("context") or {}) + + existing_abs = cls.find_existing_absolute_dir(run_id) + if existing_abs: + base_norm = os.path.realpath(resolve_app_log_dir()) + rel = os.path.relpath(existing_abs, base_norm).replace(os.sep, "/") + merged = {**prev_ctx, CONTEXT_KEY: rel} + try: + automation2_interface.updateRun(run_id, context=merged) + except Exception as ex: + logger.warning("GeRunFileLog: reattach persist failed run=%s: %s", run_id, ex) + return None + logger.info("GeRunFileLog: reattached existing folder for run=%s -> %s", run_id, existing_abs) + return cls(run_id, existing_abs) + + subdir = cls.fresh_run_subdirectory_name(run_id) + rel = cls.relative_run_path(subdir) + base = resolve_app_log_dir() + absolute = os.path.join(base, RUN_FILE_LOG_RELATIVE_ROOT, subdir) + merged = {**prev_ctx, CONTEXT_KEY: rel} + try: + automation2_interface.updateRun(run_id, context=merged) + except Exception as ex: + logger.warning("GeRunFileLog: ensure new folder persist failed run=%s: %s", run_id, ex) + return None + logger.info("GeRunFileLog: created late attach folder %s (run=%s)", absolute, run_id) + return cls(run_id, absolute) + + async def append_node_execution_line(self, record: Dict[str, Any]) -> None: + line = json.dumps(record, ensure_ascii=False, default=str) + async with self._lock: + try: + with open(self._exec_path, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception as ex: + logger.warning("GeRunFileLog: append execution failed run=%s: %s", self._run_id, ex) + + async def append_context_snapshot_line(self, record: Dict[str, Any]) -> None: + line = json.dumps(record, ensure_ascii=False, default=str) + async with self._lock: + try: + with open(self._ctx_path, "a", encoding="utf-8") as f: + f.write(line + "\n") + except Exception as ex: + logger.warning("GeRunFileLog: append context snapshot failed run=%s: %s", self._run_id, ex) diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index 659d0ea5..758d772e 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -10,15 +10,19 @@ Returns a unified handover compatible with AiResult-style downstream wiring: ``handoverMediaDocumentName`` matching a sibling blob document. - ``documents[1:]``: each extracted image as its own binary ``ActionDocument`` (like ``ai.process`` artefact outputs). -- ``ActionResult.data["response"]`` plus normalized executor field ``response``: concatenated - plain text from all text parts — safe default for ``file.create`` / primaryTextRef.""" +- Root ``presentation`` inside the JSON (`schemaVersion`, per-file modes/lines/pages/chunks/…) + — built from filtered ``parts`` without changing extractor output. +- ``ActionResult.data["response"]`` plus normalized executor field ``response``: flat text derived + from ``presentation`` (downstream-friendly wie zuvor fuer ``file.create`` / ``primaryTextRef``).""" import base64 as _b64 import binascii as _binascii +import csv import logging import re +from io import StringIO import time -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelDocref import coerceDocumentReferenceList @@ -32,6 +36,12 @@ HANDOVER_KIND = "context.extractContent.handover.v1" _CONTENT_FILTER_OPTIONS = ("all", "textOnly", "imagesOnly", "noImages") +PRESENTATION_SCHEMA_VERSION = 1 + +_PDF_EXTRACT_PRESENTATION_MODES = ("text", "tables", "images", "all") +_OUTPUT_MODES = ("blob", "lines", "pages", "chunks", "structured") +_SPLIT_BY_VALUES = ("newline", "paragraph", "sentence") +_CHUNK_UNITS = ("tokens", "characters", "words") def _apply_content_filter(payload: Dict[str, Any], content_filter: str) -> Dict[str, Any]: """Filter parts in the handover payload by content_filter. @@ -141,6 +151,498 @@ def _joined_text_from_handover_payload(payload: Dict[str, Any]) -> str: return "\n\n".join(chunks) +def _normalize_bool_select(value: Any, *, default: bool) -> bool: + s = str(value if value is not None else "").strip().lower() + if s in ("true", "1", "yes", "on"): + return True + if s in ("false", "0", "no", "off"): + return False + return default + + +def _parse_positive_int(value: Any, default: int) -> int: + try: + n = int(str(value).strip()) + return n if n > 0 else default + except (TypeError, ValueError): + return default + + +def _parse_non_negative_int(value: Any, default: int) -> int: + try: + n = int(str(value).strip()) + return n if n >= 0 else default + except (TypeError, ValueError): + return default + + +def parse_presentation_parameters(parameters: Dict[str, Any]) -> Dict[str, Any]: + """Defaults match ``context.extractContent`` node schema in ``context.py``.""" + output_mode = str(parameters.get("outputMode") or "lines").strip().lower() + if output_mode not in _OUTPUT_MODES: + output_mode = "lines" + split_by = str(parameters.get("splitBy") or "newline").strip().lower() + if split_by not in _SPLIT_BY_VALUES: + split_by = "newline" + chunk_unit = str(parameters.get("chunkSizeUnit") or "tokens").strip().lower() + if chunk_unit not in _CHUNK_UNITS: + chunk_unit = "tokens" + pdf_mode = str(parameters.get("pdfExtractMode") or "text").strip().lower() + if pdf_mode not in _PDF_EXTRACT_PRESENTATION_MODES: + pdf_mode = "text" + return { + "outputMode": output_mode, + "splitBy": split_by, + "chunkSizeUnit": chunk_unit, + "chunkSize": _parse_positive_int(parameters.get("chunkSize"), 500), + "chunkOverlap": _parse_non_negative_int(parameters.get("chunkOverlap"), 0), + "filterEmptyLines": _normalize_bool_select(parameters.get("filterEmptyLines"), default=True), + "trimWhitespace": _normalize_bool_select(parameters.get("trimWhitespace"), default=True), + "includeLineNumbers": _normalize_bool_select(parameters.get("includeLineNumbers"), default=False), + "includeMetadata": _normalize_bool_select(parameters.get("includeMetadata"), default=False), + "csvHeaderRow": _normalize_bool_select(parameters.get("csvHeaderRow"), default=True), + "pdfExtractMode": pdf_mode, + "markdownPreserveFormatting": _normalize_bool_select( + parameters.get("markdownPreserveFormatting"), + default=False, + ), + } + + +def _copy_part(p: Dict[str, Any]) -> Dict[str, Any]: + return dict(p) + + +def _presentation_filter_parts(parts: List[Dict[str, Any]], pdf_mode: str) -> List[Dict[str, Any]]: + """Filter **copies** of parts for the presentation layer (``pdfExtractMode``).""" + if pdf_mode == "all": + return [_copy_part(p) for p in parts if isinstance(p, dict)] + out: List[Dict[str, Any]] = [] + for p in parts: + if not isinstance(p, dict): + continue + tg = (p.get("typeGroup") or "").strip() + if pdf_mode == "text": + if tg == "image": + continue + if tg in ("text", "table", "structure"): + out.append(_copy_part(p)) + elif pdf_mode == "tables": + if tg == "table": + out.append(_copy_part(p)) + elif pdf_mode == "images": + if tg == "image": + out.append(_copy_part(p)) + return out + + +def _simplify_markdown_light(text: str) -> str: + """Cheap markdown-to-plain pass (no tokenizer library).""" + s = text + s = re.sub(r"`([^`]*)`", r"\1", s) + s = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", s) + s = re.sub(r"^#+\s*", "", s, flags=re.MULTILINE) + s = s.replace("**", "").replace("__", "") + s = re.sub(r"[*_]{1,2}([^*_]+)[*_]{1,2}", r"\1", s) + return s.strip() + + +def _apply_markdown_presentation_on_parts(parts: List[Dict[str, Any]], preserve: bool) -> None: + if preserve: + return + for p in parts: + mime = (p.get("mimeType") or "").strip().lower() + if mime != "text/markdown": + continue + raw = p.get("data") + if raw is None: + continue + p["data"] = _simplify_markdown_light(str(raw)) + + +def _part_metadata_dict(p: Dict[str, Any]) -> Dict[str, Any]: + meta = p.get("metadata") + if isinstance(meta, dict): + return dict(meta) + return {} + + +def _page_index_from_part(p: Dict[str, Any]) -> int: + meta = _part_metadata_dict(p) + pi = meta.get("pageIndex") + try: + return int(pi) if pi is not None else 0 + except (TypeError, ValueError): + return 0 + + +def _is_csv_source(source_file_name: str, parts: List[Dict[str, Any]]) -> bool: + low = (source_file_name or "").lower() + if low.endswith(".csv"): + return True + for p in parts: + if not isinstance(p, dict): + continue + mime = (p.get("mimeType") or "").strip().lower() + if mime == "text/csv" or mime.endswith("csv"): + return True + return False + + +def _csv_text_from_parts(parts: List[Dict[str, Any]]) -> Optional[str]: + """Prefer explicit CSV table part payload; else None.""" + for p in parts: + if not isinstance(p, dict): + continue + if (p.get("typeGroup") or "").strip() != "table": + continue + mime = (p.get("mimeType") or "").strip().lower() + if "csv" in mime or mime == "text/plain": + raw = p.get("data") + if raw is None: + continue + return str(raw) + for p in parts: + if not isinstance(p, dict): + continue + if (p.get("typeGroup") or "").strip() == "text": + mime = (p.get("mimeType") or "").strip().lower() + if mime == "text/csv": + raw = p.get("data") + if raw is not None: + return str(raw) + return None + + +def _parse_csv_rows(csv_text: str, header_row: bool) -> Optional[Dict[str, Any]]: + try: + reader = csv.reader(StringIO(csv_text)) + rows = [list(r) for r in reader] + except csv.Error: + return None + if not rows: + return {"headers": [], "rows": []} + if not header_row: + return {"headers": [], "rows": rows} + headers = [str(c).strip() for c in rows[0]] + body = rows[1:] + dict_rows: List[Dict[str, str]] = [] + for r in body: + item: Dict[str, str] = {} + for i, h in enumerate(headers): + key = h or f"column_{i + 1}" + item[key] = str(r[i]).strip() if i < len(r) else "" + dict_rows.append(item) + return {"headers": headers, "rows": dict_rows} + + +def _segment_merged_text(merged: str, split_by: str) -> List[str]: + if split_by == "paragraph": + return [s for s in re.split(r"\n\s*\n+", merged) if s != ""] + if split_by == "sentence": + pieces = re.split(r"(?<=[.!?])\s+", merged) + return [s for s in pieces if s.strip() != ""] + return merged.split("\n") + + +def _apply_line_filters( + segments: List[str], + *, + filter_empty: bool, + trim_ws: bool, +) -> List[str]: + out: List[str] = [] + for seg in segments: + s = seg + if trim_ws: + s = s.strip() + else: + s = str(s) + if filter_empty and (not s or not s.strip()): + continue + out.append(s) + return out + + +def _chars_per_unit(unit: str, chunk_size: int) -> int: + # Token path: rough heuristic ~4 characters per token (documented convention). + if unit == "tokens": + return max(1, chunk_size * 4) + if unit == "words": + return max(1, chunk_size * 6) + return max(1, chunk_size) + + +def _overlap_chars(unit: str, overlap: int, chunk_size: int) -> int: + return min(_chars_per_unit(unit, overlap), _chars_per_unit(unit, chunk_size)) + + +def _chunk_plain_text(text: str, cfg: Dict[str, Any]) -> List[str]: + unit = cfg["chunkSizeUnit"] + size = cfg["chunkSize"] + overlap_amount = cfg["chunkOverlap"] + if unit == "words": + words = text.split() + if not words: + return [] + out: List[str] = [] + step = max(1, size - overlap_amount) + i = 0 + while i < len(words): + chunk_words = words[i : i + size] + out.append(" ".join(chunk_words)) + if len(chunk_words) < size: + break + i += step + return out + csize = _chars_per_unit(unit, size) + ovl = min(_overlap_chars(unit, overlap_amount, size), csize - 1) if csize > 1 else 0 + if not text: + return [] + out: List[str] = [] + start = 0 + while start < len(text): + end = min(len(text), start + csize) + out.append(text[start:end]) + if end >= len(text): + break + start = max(0, end - ovl) + return out + + +def _base_item_meta( + source_file_name: str, + cfg: Dict[str, Any], + *, + segment_index: int, + offset_hint: Optional[int] = None, + page_index: Optional[int] = None, +) -> Optional[Dict[str, Any]]: + if not cfg.get("includeMetadata"): + return None + m: Dict[str, Any] = {"segmentIndex": segment_index} + if source_file_name: + m["sourceFileName"] = source_file_name + if offset_hint is not None: + m["charOffsetApprox"] = offset_hint + if page_index is not None: + m["pageIndex"] = page_index + return m + + +def presentation_response_text( + presentation: Dict[str, Any], + payload: Dict[str, Any], +) -> str: + """Derive flattened ``response`` text from ``presentation.files``.""" + + files_section = presentation.get("files") or {} + ordered = payload.get("fileOrder") + keys: List[str] = ordered if isinstance(ordered, list) and ordered else list(files_section.keys()) + chunks: List[str] = [] + for fk in keys: + bucket = files_section.get(fk) + if not isinstance(bucket, dict): + continue + mode = (bucket.get("outputMode") or "").strip() + if mode == "blob": + t = bucket.get("text") + if isinstance(t, str) and t.strip(): + chunks.append(t.strip()) + elif mode == "lines": + for it in bucket.get("items") or []: + if isinstance(it, dict): + tx = it.get("text") + if isinstance(tx, str) and tx.strip(): + chunks.append(tx.strip()) + elif mode == "pages": + for pg in bucket.get("pages") or []: + if not isinstance(pg, dict): + continue + for it in pg.get("items") or []: + if isinstance(it, dict): + tx = it.get("text") + if isinstance(tx, str) and tx.strip(): + chunks.append(tx.strip()) + elif mode == "chunks": + for it in bucket.get("chunks") or []: + if isinstance(it, dict): + tx = it.get("text") + if isinstance(tx, str) and tx.strip(): + chunks.append(tx.strip()) + elif mode == "structured": + for it in bucket.get("items") or []: + if not isinstance(it, dict): + continue + if not _part_carries_plain_text(it): + continue + tx = it.get("data") + if isinstance(tx, str) and tx.strip(): + chunks.append(tx.strip()) + return "\n\n".join(chunks) + + +def build_presentation_for_payload(payload: Dict[str, Any], cfg: Dict[str, Any]) -> Dict[str, Any]: + """Build root ``presentation`` object (does not mutate ``payload``).""" + files_section = payload.get("files") or {} + ordered = payload.get("fileOrder") + keys: List[str] = ordered if isinstance(ordered, list) and ordered else list(files_section.keys()) + out_files: Dict[str, Any] = {} + for fk in keys: + bucket = files_section.get(fk) + if not isinstance(bucket, dict): + continue + source_name = str(bucket.get("sourceFileName") or "") + raw_parts = [p for p in (bucket.get("parts") or []) if isinstance(p, dict)] + parts = _presentation_filter_parts(raw_parts, cfg["pdfExtractMode"]) + _apply_markdown_presentation_on_parts(parts, cfg["markdownPreserveFormatting"]) + out_files[fk] = _build_file_presentation(source_name, parts, cfg) + return { + "schemaVersion": PRESENTATION_SCHEMA_VERSION, + "kind": "context.extractContent.presentation.v1", + "outputMode": cfg["outputMode"], + "fileOrder": keys, + "files": out_files, + } + + +def _join_parts_plain_text(parts: List[Dict[str, Any]]) -> str: + blocks: List[str] = [] + for p in parts: + if not _part_carries_plain_text(p): + continue + raw = p.get("data") + if raw is None: + continue + s = str(raw).strip() + if s: + blocks.append(s) + return "\n\n".join(blocks) + + +def _redact_large_part_payload(p: Dict[str, Any]) -> Dict[str, Any]: + pc = dict(p) + tg = (pc.get("typeGroup") or "").strip().lower() + mime = (pc.get("mimeType") or "").strip().lower() + if tg == "image" or mime.startswith("image/"): + pc["data"] = "" + return pc + + +def _build_file_presentation( + source_file_name: str, + parts: List[Dict[str, Any]], + cfg: Dict[str, Any], +) -> Dict[str, Any]: + output_mode = cfg["outputMode"] + merge_plain = _join_parts_plain_text(parts) + + csv_block: Optional[Dict[str, Any]] = None + if _is_csv_source(source_file_name, parts): + csv_txt = _csv_text_from_parts(parts) + if csv_txt is not None: + csv_block = _parse_csv_rows(csv_txt, cfg["csvHeaderRow"]) + + base: Dict[str, Any] = { + "outputMode": output_mode, + "sourceFileName": source_file_name or None, + } + if csv_block is not None: + base["csv"] = csv_block + + if output_mode == "blob": + base["text"] = merge_plain + return base + + if output_mode == "structured": + base["items"] = [_redact_large_part_payload(_copy_part(p)) for p in parts] + return base + + if output_mode == "pages": + by_page: Dict[int, List[str]] = {} + for p in parts: + if not _part_carries_plain_text(p): + continue + raw = p.get("data") + if raw is None: + continue + s = str(raw).strip() + if not s: + continue + pi = _page_index_from_part(p) + by_page.setdefault(pi, []).append(s) + ordered_pages = sorted(by_page.keys()) + page_objs: List[Dict[str, Any]] = [] + for pi in ordered_pages: + merged = "\n\n".join(by_page[pi]) + segs = _segment_merged_text(merged, cfg["splitBy"]) + segs = _apply_line_filters( + segs, + filter_empty=cfg["filterEmptyLines"], + trim_ws=cfg["trimWhitespace"], + ) + items: List[Dict[str, Any]] = [] + offset = 0 + for idx, seg in enumerate(segs, start=1): + meta = _base_item_meta( + source_file_name, + cfg, + segment_index=idx, + offset_hint=offset, + page_index=pi, + ) + row: Dict[str, Any] = {"text": seg} + if cfg["includeLineNumbers"]: + row["lineNumber"] = idx + if meta: + row["metadata"] = meta + items.append(row) + offset += len(seg) + 1 + page_objs.append({"pageIndex": pi, "items": items}) + base["pages"] = page_objs + return base + + if output_mode == "chunks": + segs = _segment_merged_text(merge_plain, cfg["splitBy"]) + segs = _apply_line_filters( + segs, + filter_empty=cfg["filterEmptyLines"], + trim_ws=cfg["trimWhitespace"], + ) + flat = "\n".join(segs) + chunk_texts = _chunk_plain_text(flat, cfg) + chunk_objs: List[Dict[str, Any]] = [] + for idx, ct in enumerate(chunk_texts, start=1): + meta = _base_item_meta(source_file_name, cfg, segment_index=idx) + row: Dict[str, Any] = {"index": idx, "text": ct} + if meta: + row["metadata"] = meta + chunk_objs.append(row) + base["chunks"] = chunk_objs + return base + + # lines (default): shared path with pages/chunks splitting + segs = _segment_merged_text(merge_plain, cfg["splitBy"]) + segs = _apply_line_filters( + segs, + filter_empty=cfg["filterEmptyLines"], + trim_ws=cfg["trimWhitespace"], + ) + items: List[Dict[str, Any]] = [] + offset = 0 + for idx, seg in enumerate(segs, start=1): + meta = _base_item_meta(source_file_name, cfg, segment_index=idx, offset_hint=offset) + row = {"text": seg} + if cfg["includeLineNumbers"]: + row["lineNumber"] = idx + if meta: + row["metadata"] = meta + items.append(row) + offset += len(seg) + 1 + base["items"] = items + return base + + def _mime_to_file_extension(mime: str) -> str: m = (mime or "").split(";")[0].strip().lower() mapping = { @@ -364,6 +866,9 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: content_filter = "all" payload = _apply_content_filter(payload, content_filter) + pres_cfg = parse_presentation_parameters(parameters) + presentation = build_presentation_for_payload(payload, pres_cfg) + stem = f"{wf}_{int(time.time())}" # Only split image sidecars when the filtered payload can still contain image parts. if content_filter in ("all", "imagesOnly"): @@ -376,7 +881,8 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: stripped_payload = payload media_docs = [] - joined_text = _joined_text_from_handover_payload(payload) + stripped_payload["presentation"] = presentation + joined_text = presentation_response_text(presentation, stripped_payload) json_meta = { "actionType": "context.extractContent", diff --git a/modules/workflows/methods/methodContext/actions/setContext.py b/modules/workflows/methods/methodContext/actions/setContext.py index 7d54a719..10f292b7 100644 --- a/modules/workflows/methods/methodContext/actions/setContext.py +++ b/modules/workflows/methods/methodContext/actions/setContext.py @@ -320,18 +320,25 @@ def _pause_for_human_tasks( ) task_id = str((task or {}).get("id") or "") ordered_ids = [n.get("id") for n in (run_context.get("_orderedNodes") or []) if n.get("id")] - iface.updateRun( + from modules.workflows.automation2.graphicalEditorRunFileLogger import merge_persisted_run_context + + _pause_ctx = merge_persisted_run_context( + iface, run_id, - status="paused", - nodeOutputs=run_context.get("nodeOutputs"), - currentNodeId=node_id, - context={ + { "connectionMap": run_context.get("connectionMap"), "inputSources": run_context.get("inputSources"), "orderedNodeIds": ordered_ids, "pauseReason": "contextAssignment", }, ) + iface.updateRun( + run_id, + status="paused", + nodeOutputs=run_context.get("nodeOutputs"), + currentNodeId=node_id, + context=_pause_ctx, + ) if not (run_id and task_id and node_id): raise RuntimeError("humanTask requires _runId, task id, and _workflowNodeId") raise PauseForHumanTaskError(runId=run_id, taskId=task_id, nodeId=node_id) diff --git a/tests/unit/workflow/test_extract_content_handover.py b/tests/unit/workflow/test_extract_content_handover.py index f393c0ea..e9a71636 100644 --- a/tests/unit/workflow/test_extract_content_handover.py +++ b/tests/unit/workflow/test_extract_content_handover.py @@ -7,6 +7,9 @@ from modules.workflows.methods.methodContext.actions.extractContent import ( _apply_content_filter, _joined_text_from_handover_payload, _split_images_to_sidecar_documents, + build_presentation_for_payload, + parse_presentation_parameters, + presentation_response_text, ) @@ -142,3 +145,116 @@ def test_content_filter_text_only_no_sidecars(): result = _apply_content_filter(_mixed_payload(), "textOnly") stripped, blobs = _split_images_to_sidecar_documents(result, document_name_stem="test") assert blobs == [] + + +def test_presentation_lines_and_response(): + payload = { + "kind": HANDOVER_KIND, + "fileOrder": ["f1"], + "files": { + "f1": { + "sourceFileName": "x.txt", + "parts": [ + {"typeGroup": "text", "data": "a\n\nb", "id": "1"}, + ], + }, + }, + } + cfg = parse_presentation_parameters({"outputMode": "lines", "splitBy": "paragraph"}) + pres = build_presentation_for_payload(payload, cfg) + assert pres["files"]["f1"]["outputMode"] == "lines" + assert [it["text"] for it in pres["files"]["f1"]["items"]] == ["a", "b"] + assert presentation_response_text(pres, payload) == "a\n\nb" + + +def test_presentation_pdf_mode_tables_only(): + payload = { + "fileOrder": ["f1"], + "files": { + "f1": { + "sourceFileName": "d.pdf", + "parts": [ + {"typeGroup": "text", "data": "t", "id": "a"}, + {"typeGroup": "table", "mimeType": "text/csv", "data": "h1,h2\n1,2", "id": "b"}, + ], + }, + }, + } + cfg = parse_presentation_parameters({"pdfExtractMode": "tables", "outputMode": "blob"}) + pres = build_presentation_for_payload(payload, cfg) + assert pres["files"]["f1"]["text"] == "h1,h2\n1,2" + + +def test_presentation_csv_rows(): + payload = { + "fileOrder": ["f1"], + "files": { + "f1": { + "sourceFileName": "f.csv", + "parts": [{"typeGroup": "table", "mimeType": "text/csv", "data": "a,b\n1,2", "id": "t"}], + }, + }, + } + cfg = parse_presentation_parameters({"csvHeaderRow": "true"}) + pres = build_presentation_for_payload(payload, cfg) + csv = pres["files"]["f1"]["csv"] + assert csv["headers"] == ["a", "b"] + assert csv["rows"] == [{"a": "1", "b": "2"}] + + +def test_presentation_pages_groups_by_page_index(): + payload = { + "fileOrder": ["f1"], + "files": { + "f1": { + "sourceFileName": "p.pdf", + "parts": [ + {"typeGroup": "text", "data": "p0", "metadata": {"pageIndex": 0}, "id": "a"}, + {"typeGroup": "text", "data": "p1a\np1b", "metadata": {"pageIndex": 1}, "id": "b"}, + ], + }, + }, + } + cfg = parse_presentation_parameters({"outputMode": "pages", "splitBy": "newline"}) + pres = build_presentation_for_payload(payload, cfg) + pages = pres["files"]["f1"]["pages"] + assert [(p["pageIndex"], [it["text"] for it in p["items"]]) for p in pages] == [ + (0, ["p0"]), + (1, ["p1a", "p1b"]), + ] + + +def test_presentation_chunks_with_overlap_chars(): + payload = { + "fileOrder": ["f1"], + "files": {"f1": {"sourceFileName": "t.txt", "parts": [{"typeGroup": "text", "data": "abcdefghij", "id": "a"}]}}, + } + cfg = parse_presentation_parameters( + {"outputMode": "chunks", "chunkSizeUnit": "characters", "chunkSize": "4", "chunkOverlap": "2"} + ) + pres = build_presentation_for_payload(payload, cfg) + texts = [c["text"] for c in pres["files"]["f1"]["chunks"]] + assert texts == ["abcd", "cdef", "efgh", "ghij"] + + +def test_presentation_stripped_payload_gains_presentation_key_after_split(): + raw = b"x" + b64 = base64.b64encode(raw).decode("ascii") + payload = { + "kind": HANDOVER_KIND, + "schemaVersion": 1, + "fileOrder": ["f1"], + "files": { + "f1": { + "parts": [ + {"typeGroup": "text", "data": "txt", "id": "t"}, + {"typeGroup": "image", "mimeType": "image/png", "data": b64, "id": "img"}, + ] + } + }, + } + pres = build_presentation_for_payload(payload, parse_presentation_parameters({})) + stripped, _blobs = _split_images_to_sidecar_documents(payload, document_name_stem="s") + stripped["presentation"] = pres + assert "presentation" in stripped + assert stripped["presentation"]["files"]["f1"]["items"] diff --git a/tests/unit/workflow/test_phase3_context_node.py b/tests/unit/workflow/test_phase3_context_node.py index 3f055ca3..07496025 100644 --- a/tests/unit/workflow/test_phase3_context_node.py +++ b/tests/unit/workflow/test_phase3_context_node.py @@ -25,7 +25,25 @@ def test_context_extractContent_node_shape(): assert "DocumentList" in node["inputPorts"][0]["accepts"] assert "LoopItem" in node["inputPorts"][0]["accepts"] names = [p["name"] for p in node["parameters"]] - assert names == ["documentList", "contentFilter"] + assert names == [ + "documentList", + "contentFilter", + "outputMode", + "splitBy", + "chunkSizeUnit", + "chunkSize", + "chunkOverlap", + "filterEmptyLines", + "trimWhitespace", + "includeLineNumbers", + "includeMetadata", + "csvHeaderRow", + "pdfExtractMode", + "markdownPreserveFormatting", + ] + + pick_paths = [opt["path"] for opt in node["outputPorts"][0]["dataPickOptions"]] + assert ["documents", 0, "documentData", "presentation"] in pick_paths def test_udm_port_types_registered():