193 lines
7.3 KiB
Python
193 lines
7.3 KiB
Python
|
|
import time
|
||
|
|
import re
|
||
|
|
from typing import Dict, Any, List, Callable, Optional
|
||
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
|
from .workflow_definition import Workflow, WorkflowStep, ExecutionMode
|
||
|
|
|
||
|
|
class WorkflowExecutionContext:
|
||
|
|
def __init__(self):
|
||
|
|
self.variables: Dict[str, Any] = {}
|
||
|
|
self.step_results: Dict[str, Any] = {}
|
||
|
|
self.execution_log: List[Dict[str, Any]] = []
|
||
|
|
|
||
|
|
def set_variable(self, name: str, value: Any):
|
||
|
|
self.variables[name] = value
|
||
|
|
|
||
|
|
def get_variable(self, name: str, default: Any = None) -> Any:
|
||
|
|
return self.variables.get(name, default)
|
||
|
|
|
||
|
|
def set_step_result(self, step_id: str, result: Any):
|
||
|
|
self.step_results[step_id] = result
|
||
|
|
|
||
|
|
def get_step_result(self, step_id: str) -> Any:
|
||
|
|
return self.step_results.get(step_id)
|
||
|
|
|
||
|
|
def log_event(self, event_type: str, step_id: str, details: Dict[str, Any]):
|
||
|
|
self.execution_log.append({
|
||
|
|
'timestamp': time.time(),
|
||
|
|
'event_type': event_type,
|
||
|
|
'step_id': step_id,
|
||
|
|
'details': details
|
||
|
|
})
|
||
|
|
|
||
|
|
class WorkflowEngine:
|
||
|
|
def __init__(self, tool_executor: Callable, max_workers: int = 5):
|
||
|
|
self.tool_executor = tool_executor
|
||
|
|
self.max_workers = max_workers
|
||
|
|
|
||
|
|
def _evaluate_condition(self, condition: str, context: WorkflowExecutionContext) -> bool:
|
||
|
|
if not condition:
|
||
|
|
return True
|
||
|
|
|
||
|
|
try:
|
||
|
|
safe_locals = {
|
||
|
|
'variables': context.variables,
|
||
|
|
'results': context.step_results
|
||
|
|
}
|
||
|
|
return eval(condition, {"__builtins__": {}}, safe_locals)
|
||
|
|
except Exception:
|
||
|
|
return False
|
||
|
|
|
||
|
|
def _substitute_variables(self, arguments: Dict[str, Any], context: WorkflowExecutionContext) -> Dict[str, Any]:
|
||
|
|
substituted = {}
|
||
|
|
for key, value in arguments.items():
|
||
|
|
if isinstance(value, str):
|
||
|
|
pattern = r'\$\{([^}]+)\}'
|
||
|
|
matches = re.findall(pattern, value)
|
||
|
|
for match in matches:
|
||
|
|
if match.startswith('step.'):
|
||
|
|
step_id = match.split('.', 1)[1]
|
||
|
|
replacement = context.get_step_result(step_id)
|
||
|
|
if replacement is not None:
|
||
|
|
value = value.replace(f'${{{match}}}', str(replacement))
|
||
|
|
elif match.startswith('var.'):
|
||
|
|
var_name = match.split('.', 1)[1]
|
||
|
|
replacement = context.get_variable(var_name)
|
||
|
|
if replacement is not None:
|
||
|
|
value = value.replace(f'${{{match}}}', str(replacement))
|
||
|
|
substituted[key] = value
|
||
|
|
else:
|
||
|
|
substituted[key] = value
|
||
|
|
return substituted
|
||
|
|
|
||
|
|
def _execute_step(self, step: WorkflowStep, context: WorkflowExecutionContext) -> Dict[str, Any]:
|
||
|
|
if not self._evaluate_condition(step.condition, context):
|
||
|
|
context.log_event('skipped', step.step_id, {'reason': 'condition_not_met'})
|
||
|
|
return {'status': 'skipped', 'step_id': step.step_id}
|
||
|
|
|
||
|
|
arguments = self._substitute_variables(step.arguments, context)
|
||
|
|
|
||
|
|
start_time = time.time()
|
||
|
|
retry_attempts = 0
|
||
|
|
last_error = None
|
||
|
|
|
||
|
|
while retry_attempts <= step.retry_count:
|
||
|
|
try:
|
||
|
|
context.log_event('executing', step.step_id, {
|
||
|
|
'tool': step.tool_name,
|
||
|
|
'arguments': arguments,
|
||
|
|
'attempt': retry_attempts + 1
|
||
|
|
})
|
||
|
|
|
||
|
|
result = self.tool_executor(step.tool_name, arguments)
|
||
|
|
|
||
|
|
execution_time = time.time() - start_time
|
||
|
|
context.set_step_result(step.step_id, result)
|
||
|
|
context.log_event('completed', step.step_id, {
|
||
|
|
'execution_time': execution_time,
|
||
|
|
'result_size': len(str(result)) if result else 0
|
||
|
|
})
|
||
|
|
|
||
|
|
return {
|
||
|
|
'status': 'success',
|
||
|
|
'step_id': step.step_id,
|
||
|
|
'result': result,
|
||
|
|
'execution_time': execution_time
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
last_error = str(e)
|
||
|
|
retry_attempts += 1
|
||
|
|
if retry_attempts <= step.retry_count:
|
||
|
|
time.sleep(1 * retry_attempts)
|
||
|
|
|
||
|
|
context.log_event('failed', step.step_id, {'error': last_error})
|
||
|
|
return {
|
||
|
|
'status': 'failed',
|
||
|
|
'step_id': step.step_id,
|
||
|
|
'error': last_error,
|
||
|
|
'execution_time': time.time() - start_time
|
||
|
|
}
|
||
|
|
|
||
|
|
def _get_next_steps(self, completed_step: WorkflowStep, result: Dict[str, Any],
|
||
|
|
workflow: Workflow) -> List[WorkflowStep]:
|
||
|
|
next_steps = []
|
||
|
|
|
||
|
|
if result['status'] == 'success' and completed_step.on_success:
|
||
|
|
for step_id in completed_step.on_success:
|
||
|
|
step = workflow.get_step(step_id)
|
||
|
|
if step:
|
||
|
|
next_steps.append(step)
|
||
|
|
|
||
|
|
elif result['status'] == 'failed' and completed_step.on_failure:
|
||
|
|
for step_id in completed_step.on_failure:
|
||
|
|
step = workflow.get_step(step_id)
|
||
|
|
if step:
|
||
|
|
next_steps.append(step)
|
||
|
|
|
||
|
|
elif workflow.execution_mode == ExecutionMode.SEQUENTIAL:
|
||
|
|
current_index = workflow.steps.index(completed_step)
|
||
|
|
if current_index + 1 < len(workflow.steps):
|
||
|
|
next_steps.append(workflow.steps[current_index + 1])
|
||
|
|
|
||
|
|
return next_steps
|
||
|
|
|
||
|
|
def execute_workflow(self, workflow: Workflow, initial_variables: Optional[Dict[str, Any]] = None) -> WorkflowExecutionContext:
|
||
|
|
context = WorkflowExecutionContext()
|
||
|
|
|
||
|
|
if initial_variables:
|
||
|
|
context.variables.update(initial_variables)
|
||
|
|
|
||
|
|
if workflow.variables:
|
||
|
|
context.variables.update(workflow.variables)
|
||
|
|
|
||
|
|
context.log_event('workflow_started', 'workflow', {'name': workflow.name})
|
||
|
|
|
||
|
|
if workflow.execution_mode == ExecutionMode.PARALLEL:
|
||
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||
|
|
futures = {
|
||
|
|
executor.submit(self._execute_step, step, context): step
|
||
|
|
for step in workflow.steps
|
||
|
|
}
|
||
|
|
|
||
|
|
for future in as_completed(futures):
|
||
|
|
step = futures[future]
|
||
|
|
try:
|
||
|
|
result = future.result()
|
||
|
|
context.log_event('step_completed', step.step_id, result)
|
||
|
|
except Exception as e:
|
||
|
|
context.log_event('step_failed', step.step_id, {'error': str(e)})
|
||
|
|
|
||
|
|
else:
|
||
|
|
pending_steps = workflow.get_initial_steps()
|
||
|
|
executed_step_ids = set()
|
||
|
|
|
||
|
|
while pending_steps:
|
||
|
|
step = pending_steps.pop(0)
|
||
|
|
|
||
|
|
if step.step_id in executed_step_ids:
|
||
|
|
continue
|
||
|
|
|
||
|
|
result = self._execute_step(step, context)
|
||
|
|
executed_step_ids.add(step.step_id)
|
||
|
|
|
||
|
|
next_steps = self._get_next_steps(step, result, workflow)
|
||
|
|
pending_steps.extend(next_steps)
|
||
|
|
|
||
|
|
context.log_event('workflow_completed', 'workflow', {
|
||
|
|
'total_steps': len(context.step_results),
|
||
|
|
'executed_steps': list(context.step_results.keys())
|
||
|
|
})
|
||
|
|
|
||
|
|
return context
|