# Copyright (c) 2026 Patrick Motsch # All rights reserved. """T3 — Unit tests for the workflow-CRUD agent tools. Covers AC 5 + AC 6 of the PWG-Pilot plan: - createWorkflow happy-path returns a workflowId. - createWorkflow rejects missing label / instanceId. - deleteWorkflow without ``confirm=true`` is a NO-OP and returns an error. - deleteWorkflow with ``confirm=true`` deletes and returns success. - updateWorkflowMetadata patches only the supplied fields. - createWorkflowFromFile / exportWorkflowToFile happy-path round-trip. The tools call into a feature-instance interface; we replace ``workflowTools._getInterface`` with a fake that captures interactions without touching any database. """ import asyncio import json import uuid from typing import Any, Dict, Optional import pytest from modules.serviceCenter.services.serviceAgent import workflowTools from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- class _FakeInterface: """In-memory stand-in for ``GraphicalEditorObjects``. Stores workflows by id and records every method call in ``self.calls`` so tests can assert on the parameters the tool layer forwarded. """ def __init__(self, mandateId: str = "mand-1", featureInstanceId: str = "inst-1"): self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.workflows: Dict[str, Dict[str, Any]] = {} self.calls: list = [] def createWorkflow(self, data: Dict[str, Any]) -> Dict[str, Any]: self.calls.append(("createWorkflow", data)) wfId = data.get("id") or str(uuid.uuid4()) record = dict(data) record["id"] = wfId record["mandateId"] = self.mandateId record["featureInstanceId"] = self.featureInstanceId record.setdefault("active", False) self.workflows[wfId] = record return record def updateWorkflow(self, workflowId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: self.calls.append(("updateWorkflow", workflowId, data)) existing = self.workflows.get(workflowId) if not existing: return None existing.update(data) return existing def deleteWorkflow(self, workflowId: str) -> bool: self.calls.append(("deleteWorkflow", workflowId)) return self.workflows.pop(workflowId, None) is not None def getWorkflow(self, workflowId: str) -> Optional[Dict[str, Any]]: return self.workflows.get(workflowId) def importWorkflowFromDict( self, envelope: Dict[str, Any], existingWorkflowId: Optional[str] = None, ) -> Dict[str, Any]: self.calls.append(("importWorkflowFromDict", envelope, existingWorkflowId)) data = { "label": envelope.get("label", "Imported"), "description": envelope.get("description", ""), "tags": envelope.get("tags", []), "graph": envelope.get("graph", {"nodes": [], "connections": []}), "invocations": envelope.get("invocations", []), "active": False, } if existingWorkflowId: updated = self.updateWorkflow(existingWorkflowId, data) or {} return {"workflow": updated, "warnings": [], "created": False} created = self.createWorkflow(data) return {"workflow": created, "warnings": [], "created": True} def exportWorkflowToDict(self, workflowId: str) -> Optional[Dict[str, Any]]: wf = self.workflows.get(workflowId) if not wf: return None return { "$schemaVersion": "1.0", "$kind": "poweron.workflow", "label": wf.get("label"), "description": wf.get("description", ""), "tags": wf.get("tags", []), "graph": wf.get("graph") or {"nodes": [], "connections": []}, "invocations": wf.get("invocations") or [], } @pytest.fixture def fakeInterface(monkeypatch): """Replace ``_getInterface`` with a fixture-scoped fake.""" fake = _FakeInterface() monkeypatch.setattr(workflowTools, "_getInterface", lambda _ctx, _iid: fake) return fake def _ctx(workflowId: str = "wf-1", instanceId: str = "inst-1") -> Dict[str, Any]: """Standard agent-tool context dict.""" return { "workflowId": workflowId, "featureInstanceId": instanceId, "userId": "user-1", "mandateId": "mand-1", } def _runTool(handler, params: Dict[str, Any], context: Dict[str, Any]) -> ToolResult: return asyncio.run(handler(params, context)) def _payload(result: ToolResult) -> Dict[str, Any]: """Decode the tool's data string back into a dict for easy asserts.""" assert isinstance(result.data, str), "ToolResult.data must be a string per registry contract" return json.loads(result.data) # --------------------------------------------------------------------------- # createWorkflow — AC 5 # --------------------------------------------------------------------------- class TestCreateWorkflow: def test_happyPathReturnsWorkflowId(self, fakeInterface): result = _runTool(workflowTools._createWorkflow, {"label": "Smoke-Test"}, _ctx()) assert result.success, result.error payload = _payload(result) assert payload["workflowId"] assert payload["label"] == "Smoke-Test" assert payload["workflowId"] in fakeInterface.workflows assert fakeInterface.workflows[payload["workflowId"]]["active"] is False def test_missingLabelIsRejected(self, fakeInterface): result = _runTool(workflowTools._createWorkflow, {}, _ctx()) assert not result.success assert "label" in (result.error or "").lower() assert fakeInterface.calls == [], "no DB call must happen on validation error" def test_missingInstanceIdIsRejected(self, fakeInterface): ctx = {"workflowId": "wf-1", "userId": "user-1", "mandateId": "mand-1"} result = _runTool(workflowTools._createWorkflow, {"label": "Empty"}, ctx) assert not result.success assert "instanceid" in (result.error or "").lower() def test_blankLabelIsRejected(self, fakeInterface): result = _runTool(workflowTools._createWorkflow, {"label": " "}, _ctx()) assert not result.success def test_initialGraphAndTagsAreForwarded(self, fakeInterface): graph = {"nodes": [{"id": "n1", "type": "trigger.manual"}], "connections": []} result = _runTool( workflowTools._createWorkflow, {"label": "With Graph", "tags": ["pwg"], "graph": graph, "description": "d"}, _ctx(), ) assert result.success wfId = _payload(result)["workflowId"] stored = fakeInterface.workflows[wfId] assert stored["tags"] == ["pwg"] assert stored["description"] == "d" assert stored["graph"]["nodes"][0]["id"] == "n1" # --------------------------------------------------------------------------- # deleteWorkflow — AC 6 # --------------------------------------------------------------------------- class TestDeleteWorkflow: def test_withoutConfirmReturnsError(self, fakeInterface): fakeInterface.workflows["wf-x"] = {"id": "wf-x", "label": "L"} result = _runTool(workflowTools._deleteWorkflow, {"workflowId": "wf-x"}, _ctx()) assert not result.success assert "confirm" in (result.error or "").lower() # Critical: no destructive call must reach the interface assert all(call[0] != "deleteWorkflow" for call in fakeInterface.calls) assert "wf-x" in fakeInterface.workflows def test_withConfirmFalseAlsoBlocks(self, fakeInterface): fakeInterface.workflows["wf-x"] = {"id": "wf-x", "label": "L"} result = _runTool( workflowTools._deleteWorkflow, {"workflowId": "wf-x", "confirm": False}, _ctx(), ) assert not result.success assert "wf-x" in fakeInterface.workflows def test_withConfirmTrueDeletes(self, fakeInterface): fakeInterface.workflows["wf-x"] = {"id": "wf-x", "label": "L"} result = _runTool( workflowTools._deleteWorkflow, {"workflowId": "wf-x", "confirm": True}, _ctx(), ) assert result.success, result.error assert "wf-x" not in fakeInterface.workflows def test_unknownWorkflowReturnsError(self, fakeInterface): result = _runTool( workflowTools._deleteWorkflow, {"workflowId": "wf-ghost", "confirm": True}, _ctx(), ) assert not result.success assert "not found" in (result.error or "").lower() def test_missingIdsReturnError(self, fakeInterface): result = _runTool( workflowTools._deleteWorkflow, {"confirm": True}, {"userId": "user-1", "mandateId": "mand-1"}, ) assert not result.success assert "required" in (result.error or "").lower() # --------------------------------------------------------------------------- # updateWorkflowMetadata — supports the "rename" intent without touching graph # --------------------------------------------------------------------------- class TestUpdateWorkflowMetadata: def test_renameOnlyTouchesLabel(self, fakeInterface): fakeInterface.workflows["wf-1"] = { "id": "wf-1", "label": "Old Name", "graph": {"nodes": [{"id": "n1"}], "connections": []}, } result = _runTool( workflowTools._updateWorkflowMetadata, {"workflowId": "wf-1", "label": "New Name"}, _ctx(), ) assert result.success, result.error payload = _payload(result) assert payload["label"] == "New Name" assert payload["changed"] == ["label"] # Graph must remain untouched stored = fakeInterface.workflows["wf-1"] assert stored["graph"]["nodes"][0]["id"] == "n1" def test_emptyPatchIsRejected(self, fakeInterface): fakeInterface.workflows["wf-1"] = {"id": "wf-1", "label": "L"} result = _runTool( workflowTools._updateWorkflowMetadata, {"workflowId": "wf-1"}, _ctx(), ) assert not result.success def test_blankLabelIsRejected(self, fakeInterface): fakeInterface.workflows["wf-1"] = {"id": "wf-1", "label": "L"} result = _runTool( workflowTools._updateWorkflowMetadata, {"workflowId": "wf-1", "label": " "}, _ctx(), ) assert not result.success # --------------------------------------------------------------------------- # createWorkflowFromFile / exportWorkflowToFile — round-trip via the tool layer # --------------------------------------------------------------------------- class TestImportExportTools: def test_inlineEnvelopeImportCreatesWorkflow(self, fakeInterface): envelope = { "$schemaVersion": "1.0", "label": "Imported PWG", "graph": {"nodes": [{"id": "n1", "type": "trigger.manual"}], "connections": []}, } result = _runTool( workflowTools._createWorkflowFromFile, {"envelope": envelope}, _ctx(), ) assert result.success, result.error payload = _payload(result) assert payload["workflowId"] assert payload["created"] is True assert payload["label"] == "Imported PWG" assert fakeInterface.workflows[payload["workflowId"]]["active"] is False def test_importRequiresFileIdOrEnvelope(self, fakeInterface): result = _runTool( workflowTools._createWorkflowFromFile, {}, _ctx(), ) assert not result.success assert "fileid" in (result.error or "").lower() or "envelope" in (result.error or "").lower() def test_existingWorkflowIdReplacesGraph(self, fakeInterface): fakeInterface.workflows["wf-1"] = { "id": "wf-1", "label": "Existing", "graph": {"nodes": [], "connections": []}, } envelope = { "$schemaVersion": "1.0", "label": "Replaced", "graph": {"nodes": [{"id": "n2", "type": "trigger.manual"}], "connections": []}, } result = _runTool( workflowTools._createWorkflowFromFile, {"envelope": envelope, "existingWorkflowId": "wf-1"}, _ctx(), ) assert result.success, result.error payload = _payload(result) assert payload["created"] is False assert fakeInterface.workflows["wf-1"]["graph"]["nodes"][0]["id"] == "n2" def test_exportProducesEnvelopeWithSchemaVersion(self, fakeInterface): fakeInterface.workflows["wf-1"] = { "id": "wf-1", "label": "Round-Trip", "graph": {"nodes": [{"id": "n1", "type": "trigger.manual"}], "connections": []}, } result = _runTool( workflowTools._exportWorkflowToFile, {"workflowId": "wf-1"}, _ctx(), ) assert result.success, result.error payload = _payload(result) assert payload["fileName"].endswith(".workflow.json") assert payload["schemaVersion"] == "1.0" envelope = payload["envelope"] assert envelope["label"] == "Round-Trip" assert envelope["$kind"] == "poweron.workflow" def test_exportUnknownWorkflowReturnsError(self, fakeInterface): result = _runTool( workflowTools._exportWorkflowToFile, {"workflowId": "wf-ghost"}, _ctx(), ) assert not result.success assert "not found" in (result.error or "").lower() # --------------------------------------------------------------------------- # Tool definitions — make sure the new tools are registered with the toolbox # (cheap regression test that a refactor doesn't drop one of them silently) # --------------------------------------------------------------------------- class TestToolDefinitions: def test_allCrudToolsAreRegistered(self): defs = workflowTools.getWorkflowToolDefinitions() names = {d["name"] for d in defs} for required in ( "createWorkflow", "createWorkflowFromFile", "exportWorkflowToFile", "deleteWorkflow", "updateWorkflowMetadata", ): assert required in names, f"{required} missing from workflow toolbox" def test_deleteWorkflowMarksConfirmRequired(self): defs = {d["name"]: d for d in workflowTools.getWorkflowToolDefinitions()} deleteSpec = defs["deleteWorkflow"] params = deleteSpec.get("parameters", {}) assert "confirm" in (params.get("required") or []), ( "deleteWorkflow must declare confirm as required so the model " "cannot accidentally call it without an explicit confirmation." )