512 lines
20 KiB
Python
Raw Normal View History

import tempfile
import os
from rp.workflows.workflow_definition import ExecutionMode, Workflow, WorkflowStep
from rp.workflows.workflow_storage import WorkflowStorage
from rp.workflows.workflow_engine import WorkflowExecutionContext, WorkflowEngine
class TestWorkflowStep:
def test_init(self):
step = WorkflowStep(
tool_name="test_tool",
arguments={"arg1": "value1"},
step_id="step1",
condition="True",
on_success=["step2"],
on_failure=["step3"],
retry_count=2,
timeout_seconds=600,
)
assert step.tool_name == "test_tool"
assert step.arguments == {"arg1": "value1"}
assert step.step_id == "step1"
assert step.condition == "True"
assert step.on_success == ["step2"]
assert step.on_failure == ["step3"]
assert step.retry_count == 2
assert step.timeout_seconds == 600
def test_to_dict(self):
step = WorkflowStep(tool_name="test_tool", arguments={"arg1": "value1"}, step_id="step1")
expected = {
"tool_name": "test_tool",
"arguments": {"arg1": "value1"},
"step_id": "step1",
"condition": None,
"on_success": None,
"on_failure": None,
"retry_count": 0,
"timeout_seconds": 300,
}
assert step.to_dict() == expected
def test_from_dict(self):
data = {
"tool_name": "test_tool",
"arguments": {"arg1": "value1"},
"step_id": "step1",
"condition": "True",
"on_success": ["step2"],
"on_failure": ["step3"],
"retry_count": 2,
"timeout_seconds": 600,
}
step = WorkflowStep.from_dict(data)
assert step.tool_name == "test_tool"
assert step.arguments == {"arg1": "value1"}
assert step.step_id == "step1"
assert step.condition == "True"
assert step.on_success == ["step2"]
assert step.on_failure == ["step3"]
assert step.retry_count == 2
assert step.timeout_seconds == 600
class TestWorkflow:
def test_init(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL,
variables={"var1": "value1"},
tags=["tag1", "tag2"],
)
assert workflow.name == "test_workflow"
assert workflow.description == "A test workflow"
assert workflow.steps == [step1, step2]
assert workflow.execution_mode == ExecutionMode.PARALLEL
assert workflow.variables == {"var1": "value1"}
assert workflow.tags == ["tag1", "tag2"]
def test_to_dict(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1],
execution_mode=ExecutionMode.SEQUENTIAL,
variables={"var1": "value1"},
tags=["tag1"],
)
expected = {
"name": "test_workflow",
"description": "A test workflow",
"steps": [step1.to_dict()],
"execution_mode": "sequential",
"variables": {"var1": "value1"},
"tags": ["tag1"],
}
assert workflow.to_dict() == expected
def test_from_dict(self):
data = {
"name": "test_workflow",
"description": "A test workflow",
"steps": [
{
"tool_name": "tool1",
"arguments": {},
"step_id": "step1",
"condition": None,
"on_success": None,
"on_failure": None,
"retry_count": 0,
"timeout_seconds": 300,
}
],
"execution_mode": "parallel",
"variables": {"var1": "value1"},
"tags": ["tag1"],
}
workflow = Workflow.from_dict(data)
assert workflow.name == "test_workflow"
assert workflow.description == "A test workflow"
assert len(workflow.steps) == 1
assert workflow.steps[0].tool_name == "tool1"
assert workflow.execution_mode == ExecutionMode.PARALLEL
assert workflow.variables == {"var1": "value1"}
assert workflow.tags == ["tag1"]
def test_add_step(self):
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[])
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow.add_step(step)
assert workflow.steps == [step]
def test_get_step(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow", description="A test workflow", steps=[step1, step2]
)
assert workflow.get_step("step1") == step1
assert workflow.get_step("step2") == step2
assert workflow.get_step("nonexistent") is None
def test_get_initial_steps_sequential(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL,
)
assert workflow.get_initial_steps() == [step1]
def test_get_initial_steps_parallel(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL,
)
assert workflow.get_initial_steps() == [step1, step2]
def test_get_initial_steps_conditional(self):
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1", condition="True")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test_workflow",
description="A test workflow",
steps=[step1, step2],
execution_mode=ExecutionMode.CONDITIONAL,
)
assert workflow.get_initial_steps() == [step2] # Only step without condition
class TestWorkflowStorage:
def setup_method(self):
self.temp_db = tempfile.NamedTemporaryFile(delete=False)
self.temp_db.close()
self.storage = WorkflowStorage(self.temp_db.name)
def teardown_method(self):
os.unlink(self.temp_db.name)
def test_save_and_load_workflow(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
loaded = self.storage.load_workflow(workflow_id)
assert loaded is not None
assert loaded.name == "test_workflow"
assert loaded.description == "A test workflow"
assert len(loaded.steps) == 1
assert loaded.steps[0].tool_name == "tool1"
def test_load_workflow_not_found(self):
loaded = self.storage.load_workflow("nonexistent")
assert loaded is None
def test_load_workflow_by_name(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
self.storage.save_workflow(workflow)
loaded = self.storage.load_workflow_by_name("test_workflow")
assert loaded is not None
assert loaded.name == "test_workflow"
def test_load_workflow_by_name_not_found(self):
loaded = self.storage.load_workflow_by_name("nonexistent")
assert loaded is None
def test_list_workflows(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(
name="test_workflow", description="A test workflow", steps=[step], tags=["tag1"]
)
self.storage.save_workflow(workflow)
workflows = self.storage.list_workflows()
assert len(workflows) == 1
assert workflows[0]["name"] == "test_workflow"
assert workflows[0]["tags"] == ["tag1"]
def test_list_workflows_with_tag(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow1 = Workflow(
name="test_workflow1", description="A test workflow", steps=[step], tags=["tag1"]
)
workflow2 = Workflow(
name="test_workflow2", description="A test workflow", steps=[step], tags=["tag2"]
)
self.storage.save_workflow(workflow1)
self.storage.save_workflow(workflow2)
workflows = self.storage.list_workflows("tag1")
assert len(workflows) == 1
assert workflows[0]["name"] == "test_workflow1"
def test_delete_workflow(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
deleted = self.storage.delete_workflow(workflow_id)
assert deleted is True
loaded = self.storage.load_workflow(workflow_id)
assert loaded is None
def test_delete_workflow_not_found(self):
deleted = self.storage.delete_workflow("nonexistent")
assert deleted is False
def test_save_execution(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
context = WorkflowExecutionContext()
context.set_step_result("step1", "result")
context.log_event("completed", "step1", {})
execution_id = self.storage.save_execution(workflow_id, context)
assert execution_id is not None
history = self.storage.get_execution_history(workflow_id)
assert len(history) == 1
assert history[0]["execution_id"] == execution_id
def test_get_execution_history(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
context = WorkflowExecutionContext()
context.set_step_result("step1", "result")
context.log_event("completed", "step1", {})
self.storage.save_execution(workflow_id, context)
history = self.storage.get_execution_history(workflow_id)
assert len(history) == 1
def test_get_execution_history_limit(self):
step = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
workflow = Workflow(name="test_workflow", description="A test workflow", steps=[step])
workflow_id = self.storage.save_workflow(workflow)
for i in range(5):
context = WorkflowExecutionContext()
context.set_step_result("step1", f"result{i}")
context.log_event("completed", "step1", {})
self.storage.save_execution(workflow_id, context)
history = self.storage.get_execution_history(workflow_id, limit=3)
assert len(history) == 3
class TestWorkflowExecutionContext:
def test_init(self):
context = WorkflowExecutionContext()
assert context.variables == {}
assert context.step_results == {}
assert context.execution_log == []
def test_set_variable(self):
context = WorkflowExecutionContext()
context.set_variable("var1", "value1")
assert context.get_variable("var1") == "value1"
assert context.get_variable("nonexistent") is None
assert context.get_variable("nonexistent", "default") == "default"
def test_set_step_result(self):
context = WorkflowExecutionContext()
context.set_step_result("step1", "result1")
assert context.get_step_result("step1") == "result1"
def test_log_event(self):
context = WorkflowExecutionContext()
context.log_event("test_event", "step1", {"detail": "value"})
assert len(context.execution_log) == 1
log_entry = context.execution_log[0]
assert log_entry["event_type"] == "test_event"
assert log_entry["step_id"] == "step1"
assert log_entry["details"] == {"detail": "value"}
assert "timestamp" in log_entry
class TestWorkflowEngine:
def test_init(self):
def tool_executor(tool_name, args):
return f"executed {tool_name} with {args}"
engine = WorkflowEngine(tool_executor, max_workers=10)
assert engine.tool_executor == tool_executor
assert engine.max_workers == 10
def test_evaluate_condition_true(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
assert engine._evaluate_condition("True", context) is True
def test_evaluate_condition_false(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
assert engine._evaluate_condition("False", context) is False
def test_evaluate_condition_with_variables(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
context.set_variable("test_var", "test_value")
assert engine._evaluate_condition("variables['test_var'] == 'test_value'", context) is True
def test_substitute_variables(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
context.set_variable("var1", "value1")
context.set_step_result("step1", "result1")
arguments = {"arg1": "${var.var1}", "arg2": "${step.step1}", "arg3": "plain_value"}
substituted = engine._substitute_variables(arguments, context)
assert substituted["arg1"] == "value1"
assert substituted["arg2"] == "result1"
assert substituted["arg3"] == "plain_value"
def test_execute_step_success(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
return "success_result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(tool_name="test_tool", arguments={"arg": "value"}, step_id="step1")
result = engine._execute_step(step, context)
assert result["status"] == "success"
assert result["step_id"] == "step1"
assert result["result"] == "success_result"
assert executed == [("test_tool", {"arg": "value"})]
assert context.get_step_result("step1") == "success_result"
def test_execute_step_skipped(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
return "result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", condition="False"
)
result = engine._execute_step(step, context)
assert result["status"] == "skipped"
assert result["step_id"] == "step1"
assert executed == []
def test_execute_step_failed_with_retry(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
if len(executed) < 2:
raise Exception("Temporary failure")
return "success_result"
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1
)
result = engine._execute_step(step, context)
assert result["status"] == "success"
assert result["result"] == "success_result"
assert len(executed) == 2
def test_execute_step_failed(self):
executed = []
def tool_executor(tool_name, args):
executed.append((tool_name, args))
raise Exception("Permanent failure")
engine = WorkflowEngine(tool_executor)
context = WorkflowExecutionContext()
step = WorkflowStep(
tool_name="test_tool", arguments={"arg": "value"}, step_id="step1", retry_count=1
)
result = engine._execute_step(step, context)
assert result["status"] == "failed"
assert result["error"] == "Permanent failure"
assert len(executed) == 2
def test_get_next_steps_sequential(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL,
)
result = {"status": "success"}
next_steps = engine._get_next_steps(step1, result, workflow)
assert next_steps == [step2]
def test_get_next_steps_on_success(self):
def tool_executor(tool_name, args):
return "result"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1", on_success=["step2"])
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(name="test", description="test", steps=[step1, step2])
result = {"status": "success"}
next_steps = engine._get_next_steps(step1, result, workflow)
assert next_steps == [step2]
def test_execute_workflow_sequential(self):
executed = []
def tool_executor(tool_name, args):
executed.append(tool_name)
return f"result_{tool_name}"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.SEQUENTIAL,
)
context = engine.execute_workflow(workflow)
assert executed == ["tool1", "tool2"]
assert context.get_step_result("step1") == "result_tool1"
assert context.get_step_result("step2") == "result_tool2"
def test_execute_workflow_parallel(self):
executed = []
def tool_executor(tool_name, args):
executed.append(tool_name)
return f"result_{tool_name}"
engine = WorkflowEngine(tool_executor)
step1 = WorkflowStep(tool_name="tool1", arguments={}, step_id="step1")
step2 = WorkflowStep(tool_name="tool2", arguments={}, step_id="step2")
workflow = Workflow(
name="test",
description="test",
steps=[step1, step2],
execution_mode=ExecutionMode.PARALLEL,
)
context = engine.execute_workflow(workflow)
assert set(executed) == {"tool1", "tool2"}
assert context.get_step_result("step1") == "result_tool1"
assert context.get_step_result("step2") == "result_tool2"