import pytest import tempfile import os from rp.workflows.workflow_definition import ExecutionMode, Workflow, WorkflowStep from rp.workflows.workflow_storage import WorkflowStorage from rp.workflows.workflow_engine import WorkflowExecutionContext, WorkflowEngine class TestWorkflowStep: def test_init(self): step = WorkflowStep( tool_name="test_tool", arguments={"arg1": "value1"}, step_id="step1", condition="True", on_success=["step2"], on_failure=["step3"], retry_count=2, timeout_seconds=600 ) assert step.tool_name == "test_tool" assert step.arguments == {"arg1": "value1"} assert step.step_id == "step1" assert step.condition == "True" assert step.on_success == ["step2"] assert step.on_failure == ["step3"] assert step.retry_count == 2 assert step.timeout_seconds == 600 def test_to_dict(self): step = WorkflowStep( tool_name="test_tool", arguments={"arg1": "value1"}, step_id="step1" ) expected = { "tool_name": "test_tool", "arguments": {"arg1": "value1"}, "step_id": "step1", "condition": None, "on_success": None, "on_failure": None, "retry_count": 0, "timeout_seconds": 300, } assert step.to_dict() == expected def test_from_dict(self): data = { "tool_name": "test_tool", "arguments": {"arg1": "value1"}, "step_id": "step1", "condition": "True", "on_success": ["step2"], "on_failure": ["step3"], "retry_count": 2, "timeout_seconds": 600, } step = WorkflowStep.from_dict(data) assert step.tool_name == "test_tool" assert step.arguments == {"arg1": "value1"} assert step.step_id == "step1" assert step.condition == "True" assert step.on_success == ["step2"] assert step.on_failure == ["step3"] assert step.retry_count == 2 assert step.timeout_seconds == 600 class TestWorkflow: def test_init(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1, step2], execution_mode=ExecutionMode.PARALLEL, variables={"var1": "value1"}, tags=["tag1", "tag2"] ) assert workflow.name == "test_workflow" assert workflow.description == "A test workflow" assert workflow.steps == [step1, step2] assert workflow.execution_mode == ExecutionMode.PARALLEL assert workflow.variables == {"var1": "value1"} assert workflow.tags == ["tag1", "tag2"] def test_to_dict(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1], execution_mode=ExecutionMode.SEQUENTIAL, variables={"var1": "value1"}, tags=["tag1"] ) expected = { "name": "test_workflow", "description": "A test workflow", "steps": [step1.to_dict()], "execution_mode": "sequential", "variables": {"var1": "value1"}, "tags": ["tag1"], } assert workflow.to_dict() == expected def test_from_dict(self): data = { "name": "test_workflow", "description": "A test workflow", "steps": [{ "tool_name": "tool1", "arguments": {}, "step_id": "step1", "condition": None, "on_success": None, "on_failure": None, "retry_count": 0, "timeout_seconds": 300, }], "execution_mode": "parallel", "variables": {"var1": "value1"}, "tags": ["tag1"], } workflow = Workflow.from_dict(data) assert workflow.name == "test_workflow" assert workflow.description == "A test workflow" assert len(workflow.steps) == 1 assert workflow.steps[0].tool_name == "tool1" assert workflow.execution_mode == ExecutionMode.PARALLEL assert workflow.variables == {"var1": "value1"} assert workflow.tags == ["tag1"] def test_add_step(self): workflow = Workflow( name="test_workflow", description="A test workflow", steps=[] ) step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow.add_step(step) assert workflow.steps == [step] def test_get_step(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1, step2] ) assert workflow.get_step("step1") == step1 assert workflow.get_step("step2") == step2 assert workflow.get_step("nonexistent") is None def test_get_initial_steps_sequential(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1, step2], execution_mode=ExecutionMode.SEQUENTIAL ) assert workflow.get_initial_steps() == [step1] def test_get_initial_steps_parallel(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1, step2], execution_mode=ExecutionMode.PARALLEL ) assert workflow.get_initial_steps() == [step1, step2] def test_get_initial_steps_conditional(self): step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1", condition="True") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step1, step2], execution_mode=ExecutionMode.CONDITIONAL ) assert workflow.get_initial_steps() == [step2] # Only step without condition class TestWorkflowStorage: def setup_method(self): self.temp_db = tempfile.NamedTemporaryFile(delete=False) self.temp_db.close() self.storage = WorkflowStorage(self.temp_db.name) def teardown_method(self): os.unlink(self.temp_db.name) def test_save_and_load_workflow(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) workflow_id = self.storage.save_workflow(workflow) loaded = self.storage.load_workflow(workflow_id) assert loaded is not None assert loaded.name == "test_workflow" assert loaded.description == "A test workflow" assert len(loaded.steps) == 1 assert loaded.steps[0].tool_name == "tool1" def test_load_workflow_not_found(self): loaded = self.storage.load_workflow("nonexistent") assert loaded is None def test_load_workflow_by_name(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) self.storage.save_workflow(workflow) loaded = self.storage.load_workflow_by_name("test_workflow") assert loaded is not None assert loaded.name == "test_workflow" def test_load_workflow_by_name_not_found(self): loaded = self.storage.load_workflow_by_name("nonexistent") assert loaded is None def test_list_workflows(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step], tags=["tag1"] ) self.storage.save_workflow(workflow) workflows = self.storage.list_workflows() assert len(workflows) == 1 assert workflows[0]["name"] == "test_workflow" assert workflows[0]["tags"] == ["tag1"] def test_list_workflows_with_tag(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow1 = Workflow( name="test_workflow1", description="A test workflow", steps=[step], tags=["tag1"] ) workflow2 = Workflow( name="test_workflow2", description="A test workflow", steps=[step], tags=["tag2"] ) self.storage.save_workflow(workflow1) self.storage.save_workflow(workflow2) workflows = self.storage.list_workflows("tag1") assert len(workflows) == 1 assert workflows[0]["name"] == "test_workflow1" def test_delete_workflow(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) workflow_id = self.storage.save_workflow(workflow) deleted = self.storage.delete_workflow(workflow_id) assert deleted is True loaded = self.storage.load_workflow(workflow_id) assert loaded is None def test_delete_workflow_not_found(self): deleted = self.storage.delete_workflow("nonexistent") assert deleted is False def test_save_execution(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) workflow_id = self.storage.save_workflow(workflow) context = WorkflowExecutionContext() context.set_step_result("step1", "result") context.log_event("completed", "step1", {}) execution_id = self.storage.save_execution(workflow_id, context) assert execution_id is not None history = self.storage.get_execution_history(workflow_id) assert len(history) == 1 assert history[0]["execution_id"] == execution_id def test_get_execution_history(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) workflow_id = self.storage.save_workflow(workflow) context = WorkflowExecutionContext() context.set_step_result("step1", "result") context.log_event("completed", "step1", {}) self.storage.save_execution(workflow_id, context) history = self.storage.get_execution_history(workflow_id) assert len(history) == 1 def test_get_execution_history_limit(self): step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") workflow = Workflow( name="test_workflow", description="A test workflow", steps=[step] ) workflow_id = self.storage.save_workflow(workflow) for i in range(5): context = WorkflowExecutionContext() context.set_step_result("step1", f"result{i}") context.log_event("completed", "step1", {}) self.storage.save_execution(workflow_id, context) history = self.storage.get_execution_history(workflow_id, limit=3) assert len(history) == 3 class TestWorkflowExecutionContext: def test_init(self): context = WorkflowExecutionContext() assert context.variables == {} assert context.step_results == {} assert context.execution_log == [] def test_set_variable(self): context = WorkflowExecutionContext() context.set_variable("var1", "value1") assert context.get_variable("var1") == "value1" assert context.get_variable("nonexistent") is None assert context.get_variable("nonexistent", "default") == "default" def test_set_step_result(self): context = WorkflowExecutionContext() context.set_step_result("step1", "result1") assert context.get_step_result("step1") == "result1" def test_log_event(self): context = WorkflowExecutionContext() context.log_event("test_event", "step1", {"detail": "value"}) assert len(context.execution_log) == 1 log_entry = context.execution_log[0] assert log_entry["event_type"] == "test_event" assert log_entry["step_id"] == "step1" assert log_entry["details"] == {"detail": "value"} assert "timestamp" in log_entry class TestWorkflowEngine: def test_init(self): def tool_executor(tool_name, args): return f"executed {tool_name} with {args}" engine = WorkflowEngine(tool_executor, max_workers=10) assert engine.tool_executor == tool_executor assert engine.max_workers == 10 def test_evaluate_condition_true(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() assert engine._evaluate_condition("True", context) is True def test_evaluate_condition_false(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() assert engine._evaluate_condition("False", context) is False def test_evaluate_condition_with_variables(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() context.set_variable("test_var", "test_value") assert engine._evaluate_condition("variables['test_var'] == 'test_value'", context) is True def test_substitute_variables(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() context.set_variable("var1", "value1") context.set_step_result("step1", "result1") arguments = { "arg1": "${var.var1}", "arg2": "${step.step1}", "arg3": "plain_value" } substituted = engine._substitute_variables(arguments, context) assert substituted["arg1"] == "value1" assert substituted["arg2"] == "result1" assert substituted["arg3"] == "plain_value" def test_execute_step_success(self): executed = [] def tool_executor(tool_name, args): executed.append((tool_name, args)) return "success_result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() step = WorkflowStep( tool_name="test_tool", arguments={"arg": "value"}, step_id="step1" ) result = engine._execute_step(step, context) assert result["status"] == "success" assert result["step_id"] == "step1" assert result["result"] == "success_result" assert executed == [("test_tool", {"arg": "value"})] assert context.get_step_result("step1") == "success_result" def test_execute_step_skipped(self): executed = [] def tool_executor(tool_name, args): executed.append((tool_name, args)) return "result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() step = WorkflowStep( tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", condition="False" ) result = engine._execute_step(step, context) assert result["status"] == "skipped" assert result["step_id"] == "step1" assert executed == [] def test_execute_step_failed_with_retry(self): executed = [] def tool_executor(tool_name, args): executed.append((tool_name, args)) if len(executed) < 2: raise Exception("Temporary failure") return "success_result" engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() step = WorkflowStep( tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1 ) result = engine._execute_step(step, context) assert result["status"] == "success" assert result["result"] == "success_result" assert len(executed) == 2 def test_execute_step_failed(self): executed = [] def tool_executor(tool_name, args): executed.append((tool_name, args)) raise Exception("Permanent failure") engine = WorkflowEngine(tool_executor) context = WorkflowExecutionContext() step = WorkflowStep( tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1 ) result = engine._execute_step(step, context) assert result["status"] == "failed" assert result["error"] == "Permanent failure" assert len(executed) == 2 def test_get_next_steps_sequential(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test", description="test", steps=[step1, step2], execution_mode=ExecutionMode.SEQUENTIAL ) result = {"status": "success"} next_steps = engine._get_next_steps(step1, result, workflow) assert next_steps == [step2] def test_get_next_steps_on_success(self): def tool_executor(tool_name, args): return "result" engine = WorkflowEngine(tool_executor) step1 = WorkflowStep( tool_name="tool1", arguments={}, step_id="step1", on_success=["step2"] ) step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test", description="test", steps=[step1, step2] ) result = {"status": "success"} next_steps = engine._get_next_steps(step1, result, workflow) assert next_steps == [step2] def test_execute_workflow_sequential(self): executed = [] def tool_executor(tool_name, args): executed.append(tool_name) return f"result_{tool_name}" engine = WorkflowEngine(tool_executor) step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test", description="test", steps=[step1, step2], execution_mode=ExecutionMode.SEQUENTIAL ) context = engine.execute_workflow(workflow) assert executed == ["tool1", "tool2"] assert context.get_step_result("step1") == "result_tool1" assert context.get_step_result("step2") == "result_tool2" def test_execute_workflow_parallel(self): executed = [] def tool_executor(tool_name, args): executed.append(tool_name) return f"result_{tool_name}" engine = WorkflowEngine(tool_executor) step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1") step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2") workflow = Workflow( name="test", description="test", steps=[step1, step2], execution_mode=ExecutionMode.PARALLEL ) context = engine.execute_workflow(workflow) assert set(executed) == {"tool1", "tool2"} assert context.get_step_result("step1") == "result_tool1" assert context.get_step_result("step2") == "result_tool2"