From 6a80c86d51b2b1e3bf26bf27165b0380ea63f2d0 Mon Sep 17 00:00:00 2001 From: retoor Date: Sat, 8 Nov 2025 03:44:42 +0100 Subject: [PATCH] feat: add workflow tests feat: implement workflow storage tests feat: add workflow step tests --- CHANGELOG.md | 8 + pyproject.toml | 2 +- tests/test_workflows.py | 560 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 569 insertions(+), 1 deletion(-) create mode 100644 tests/test_workflows.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 634cf37..24d38da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,14 @@ + + +## Version 1.40.0 - 2025-11-08 + +The project has been updated to version 1.40.0. Tests are now run in a virtual environment to improve reliability. + +**Changes:** 3 files, 11 lines +**Languages:** Markdown (8 lines), TOML (2 lines), YAML (1 lines) ## Version 1.39.0 - 2025-11-08 diff --git a/pyproject.toml b/pyproject.toml index b65650b..ccd322f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "rp" -version = "1.39.0" +version = "1.40.0" description = "R python edition. The ultimate autonomous AI CLI." readme = "README.md" requires-python = ">=3.10" diff --git a/tests/test_workflows.py b/tests/test_workflows.py new file mode 100644 index 0000000..b68831a --- /dev/null +++ b/tests/test_workflows.py @@ -0,0 +1,560 @@ +import pytest +import tempfile +import os +from rp.workflows.workflow_definition import ExecutionMode, Workflow, WorkflowStep +from rp.workflows.workflow_storage import WorkflowStorage +from rp.workflows.workflow_engine import WorkflowExecutionContext, WorkflowEngine + + +class TestWorkflowStep: + def test_init(self): + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg1": "value1"}, + step_id="step1", + condition="True", + on_success=["step2"], + on_failure=["step3"], + retry_count=2, + timeout_seconds=600 + ) + assert step.tool_name == "test_tool" + assert step.arguments == {"arg1": "value1"} + assert step.step_id == "step1" + assert step.condition == "True" + assert step.on_success == ["step2"] + assert step.on_failure == ["step3"] + assert step.retry_count == 2 + assert step.timeout_seconds == 600 + + def test_to_dict(self): + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg1": "value1"}, + step_id="step1" + ) + expected = { + "tool_name": "test_tool", + "arguments": {"arg1": "value1"}, + "step_id": "step1", + "condition": None, + "on_success": None, + "on_failure": None, + "retry_count": 0, + "timeout_seconds": 300, + } + assert step.to_dict() == expected + + def test_from_dict(self): + data = { + "tool_name": "test_tool", + "arguments": {"arg1": "value1"}, + "step_id": "step1", + "condition": "True", + "on_success": ["step2"], + "on_failure": ["step3"], + "retry_count": 2, + "timeout_seconds": 600, + } + step = WorkflowStep.from_dict(data) + assert step.tool_name == "test_tool" + assert step.arguments == {"arg1": "value1"} + assert step.step_id == "step1" + assert step.condition == "True" + assert step.on_success == ["step2"] + assert step.on_failure == ["step3"] + assert step.retry_count == 2 + assert step.timeout_seconds == 600 + + +class TestWorkflow: + def test_init(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1, step2], + execution_mode=ExecutionMode.PARALLEL, + variables={"var1": "value1"}, + tags=["tag1", "tag2"] + ) + assert workflow.name == "test_workflow" + assert workflow.description == "A test workflow" + assert workflow.steps == [step1, step2] + assert workflow.execution_mode == ExecutionMode.PARALLEL + assert workflow.variables == {"var1": "value1"} + assert workflow.tags == ["tag1", "tag2"] + + def test_to_dict(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1], + execution_mode=ExecutionMode.SEQUENTIAL, + variables={"var1": "value1"}, + tags=["tag1"] + ) + expected = { + "name": "test_workflow", + "description": "A test workflow", + "steps": [step1.to_dict()], + "execution_mode": "sequential", + "variables": {"var1": "value1"}, + "tags": ["tag1"], + } + assert workflow.to_dict() == expected + + def test_from_dict(self): + data = { + "name": "test_workflow", + "description": "A test workflow", + "steps": [{ + "tool_name": "tool1", + "arguments": {}, + "step_id": "step1", + "condition": None, + "on_success": None, + "on_failure": None, + "retry_count": 0, + "timeout_seconds": 300, + }], + "execution_mode": "parallel", + "variables": {"var1": "value1"}, + "tags": ["tag1"], + } + workflow = Workflow.from_dict(data) + assert workflow.name == "test_workflow" + assert workflow.description == "A test workflow" + assert len(workflow.steps) == 1 + assert workflow.steps[0].tool_name == "tool1" + assert workflow.execution_mode == ExecutionMode.PARALLEL + assert workflow.variables == {"var1": "value1"} + assert workflow.tags == ["tag1"] + + def test_add_step(self): + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[] + ) + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow.add_step(step) + assert workflow.steps == [step] + + def test_get_step(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1, step2] + ) + assert workflow.get_step("step1") == step1 + assert workflow.get_step("step2") == step2 + assert workflow.get_step("nonexistent") is None + + def test_get_initial_steps_sequential(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1, step2], + execution_mode=ExecutionMode.SEQUENTIAL + ) + assert workflow.get_initial_steps() == [step1] + + def test_get_initial_steps_parallel(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1, step2], + execution_mode=ExecutionMode.PARALLEL + ) + assert workflow.get_initial_steps() == [step1, step2] + + def test_get_initial_steps_conditional(self): + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1", condition="True") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step1, step2], + execution_mode=ExecutionMode.CONDITIONAL + ) + assert workflow.get_initial_steps() == [step2] # Only step without condition + + +class TestWorkflowStorage: + def setup_method(self): + self.temp_db = tempfile.NamedTemporaryFile(delete=False) + self.temp_db.close() + self.storage = WorkflowStorage(self.temp_db.name) + + def teardown_method(self): + os.unlink(self.temp_db.name) + + def test_save_and_load_workflow(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + workflow_id = self.storage.save_workflow(workflow) + loaded = self.storage.load_workflow(workflow_id) + assert loaded is not None + assert loaded.name == "test_workflow" + assert loaded.description == "A test workflow" + assert len(loaded.steps) == 1 + assert loaded.steps[0].tool_name == "tool1" + + def test_load_workflow_not_found(self): + loaded = self.storage.load_workflow("nonexistent") + assert loaded is None + + def test_load_workflow_by_name(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + self.storage.save_workflow(workflow) + loaded = self.storage.load_workflow_by_name("test_workflow") + assert loaded is not None + assert loaded.name == "test_workflow" + + def test_load_workflow_by_name_not_found(self): + loaded = self.storage.load_workflow_by_name("nonexistent") + assert loaded is None + + def test_list_workflows(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step], + tags=["tag1"] + ) + self.storage.save_workflow(workflow) + workflows = self.storage.list_workflows() + assert len(workflows) == 1 + assert workflows[0]["name"] == "test_workflow" + assert workflows[0]["tags"] == ["tag1"] + + def test_list_workflows_with_tag(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow1 = Workflow( + name="test_workflow1", + description="A test workflow", + steps=[step], + tags=["tag1"] + ) + workflow2 = Workflow( + name="test_workflow2", + description="A test workflow", + steps=[step], + tags=["tag2"] + ) + self.storage.save_workflow(workflow1) + self.storage.save_workflow(workflow2) + workflows = self.storage.list_workflows("tag1") + assert len(workflows) == 1 + assert workflows[0]["name"] == "test_workflow1" + + def test_delete_workflow(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + workflow_id = self.storage.save_workflow(workflow) + deleted = self.storage.delete_workflow(workflow_id) + assert deleted is True + loaded = self.storage.load_workflow(workflow_id) + assert loaded is None + + def test_delete_workflow_not_found(self): + deleted = self.storage.delete_workflow("nonexistent") + assert deleted is False + + def test_save_execution(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + workflow_id = self.storage.save_workflow(workflow) + context = WorkflowExecutionContext() + context.set_step_result("step1", "result") + context.log_event("completed", "step1", {}) + execution_id = self.storage.save_execution(workflow_id, context) + assert execution_id is not None + history = self.storage.get_execution_history(workflow_id) + assert len(history) == 1 + assert history[0]["execution_id"] == execution_id + + def test_get_execution_history(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + workflow_id = self.storage.save_workflow(workflow) + context = WorkflowExecutionContext() + context.set_step_result("step1", "result") + context.log_event("completed", "step1", {}) + self.storage.save_execution(workflow_id, context) + history = self.storage.get_execution_history(workflow_id) + assert len(history) == 1 + + def test_get_execution_history_limit(self): + step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + workflow = Workflow( + name="test_workflow", + description="A test workflow", + steps=[step] + ) + workflow_id = self.storage.save_workflow(workflow) + for i in range(5): + context = WorkflowExecutionContext() + context.set_step_result("step1", f"result{i}") + context.log_event("completed", "step1", {}) + self.storage.save_execution(workflow_id, context) + history = self.storage.get_execution_history(workflow_id, limit=3) + assert len(history) == 3 + + +class TestWorkflowExecutionContext: + def test_init(self): + context = WorkflowExecutionContext() + assert context.variables == {} + assert context.step_results == {} + assert context.execution_log == [] + + def test_set_variable(self): + context = WorkflowExecutionContext() + context.set_variable("var1", "value1") + assert context.get_variable("var1") == "value1" + assert context.get_variable("nonexistent") is None + assert context.get_variable("nonexistent", "default") == "default" + + def test_set_step_result(self): + context = WorkflowExecutionContext() + context.set_step_result("step1", "result1") + assert context.get_step_result("step1") == "result1" + + def test_log_event(self): + context = WorkflowExecutionContext() + context.log_event("test_event", "step1", {"detail": "value"}) + assert len(context.execution_log) == 1 + log_entry = context.execution_log[0] + assert log_entry["event_type"] == "test_event" + assert log_entry["step_id"] == "step1" + assert log_entry["details"] == {"detail": "value"} + assert "timestamp" in log_entry + + +class TestWorkflowEngine: + def test_init(self): + def tool_executor(tool_name, args): + return f"executed {tool_name} with {args}" + engine = WorkflowEngine(tool_executor, max_workers=10) + assert engine.tool_executor == tool_executor + assert engine.max_workers == 10 + + def test_evaluate_condition_true(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + assert engine._evaluate_condition("True", context) is True + + def test_evaluate_condition_false(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + assert engine._evaluate_condition("False", context) is False + + def test_evaluate_condition_with_variables(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + context.set_variable("test_var", "test_value") + assert engine._evaluate_condition("variables['test_var'] == 'test_value'", context) is True + + def test_substitute_variables(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + context.set_variable("var1", "value1") + context.set_step_result("step1", "result1") + arguments = { + "arg1": "${var.var1}", + "arg2": "${step.step1}", + "arg3": "plain_value" + } + substituted = engine._substitute_variables(arguments, context) + assert substituted["arg1"] == "value1" + assert substituted["arg2"] == "result1" + assert substituted["arg3"] == "plain_value" + + def test_execute_step_success(self): + executed = [] + def tool_executor(tool_name, args): + executed.append((tool_name, args)) + return "success_result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg": "value"}, + step_id="step1" + ) + result = engine._execute_step(step, context) + assert result["status"] == "success" + assert result["step_id"] == "step1" + assert result["result"] == "success_result" + assert executed == [("test_tool", {"arg": "value"})] + assert context.get_step_result("step1") == "success_result" + + def test_execute_step_skipped(self): + executed = [] + def tool_executor(tool_name, args): + executed.append((tool_name, args)) + return "result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg": "value"}, + step_id="step1", + condition="False" + ) + result = engine._execute_step(step, context) + assert result["status"] == "skipped" + assert result["step_id"] == "step1" + assert executed == [] + + def test_execute_step_failed_with_retry(self): + executed = [] + def tool_executor(tool_name, args): + executed.append((tool_name, args)) + if len(executed) < 2: + raise Exception("Temporary failure") + return "success_result" + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg": "value"}, + step_id="step1", + retry_count=1 + ) + result = engine._execute_step(step, context) + assert result["status"] == "success" + assert result["result"] == "success_result" + assert len(executed) == 2 + + def test_execute_step_failed(self): + executed = [] + def tool_executor(tool_name, args): + executed.append((tool_name, args)) + raise Exception("Permanent failure") + engine = WorkflowEngine(tool_executor) + context = WorkflowExecutionContext() + step = WorkflowStep( + tool_name="test_tool", + arguments={"arg": "value"}, + step_id="step1", + retry_count=1 + ) + result = engine._execute_step(step, context) + assert result["status"] == "failed" + assert result["error"] == "Permanent failure" + assert len(executed) == 2 + + def test_get_next_steps_sequential(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test", + description="test", + steps=[step1, step2], + execution_mode=ExecutionMode.SEQUENTIAL + ) + result = {"status": "success"} + next_steps = engine._get_next_steps(step1, result, workflow) + assert next_steps == [step2] + + def test_get_next_steps_on_success(self): + def tool_executor(tool_name, args): + return "result" + engine = WorkflowEngine(tool_executor) + step1 = WorkflowStep( + tool_name="tool1", + arguments={}, + step_id="step1", + on_success=["step2"] + ) + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test", + description="test", + steps=[step1, step2] + ) + result = {"status": "success"} + next_steps = engine._get_next_steps(step1, result, workflow) + assert next_steps == [step2] + + def test_execute_workflow_sequential(self): + executed = [] + def tool_executor(tool_name, args): + executed.append(tool_name) + return f"result_{tool_name}" + engine = WorkflowEngine(tool_executor) + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test", + description="test", + steps=[step1, step2], + execution_mode=ExecutionMode.SEQUENTIAL + ) + context = engine.execute_workflow(workflow) + assert executed == ["tool1", "tool2"] + assert context.get_step_result("step1") == "result_tool1" + assert context.get_step_result("step2") == "result_tool2" + + def test_execute_workflow_parallel(self): + executed = [] + def tool_executor(tool_name, args): + executed.append(tool_name) + return f"result_{tool_name}" + engine = WorkflowEngine(tool_executor) + step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") + step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") + workflow = Workflow( + name="test", + description="test", + steps=[step1, step2], + execution_mode=ExecutionMode.PARALLEL + ) + context = engine.execute_workflow(workflow) + assert set(executed) == {"tool1", "tool2"} + assert context.get_step_result("step1") == "result_tool1" + assert context.get_step_result("step2") == "result_tool2" \ No newline at end of file