|
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Autonomous Agent Complex Task Testing Framework
|
|
Tests agent capability to execute and complete multi-step tasks independently
|
|
"""
|
|
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import json
|
|
import subprocess
|
|
import time
|
|
from typing import Any, Dict, List
|
|
|
|
class TaskComplexity(Enum):
|
|
BASIC = 1
|
|
INTERMEDIATE = 2
|
|
ADVANCED = 3
|
|
EXPERT = 4
|
|
EXTREME = 5
|
|
|
|
@dataclass
|
|
class TestCase:
|
|
id: str
|
|
name: str
|
|
complexity: TaskComplexity
|
|
task_description: str
|
|
verification_checks: List[Dict[str, Any]]
|
|
timeout_seconds: int
|
|
expected_steps: List[str]
|
|
success_criteria: Dict[str, Any]
|
|
|
|
@dataclass
|
|
class TestResult:
|
|
test_id: str
|
|
success: bool
|
|
execution_time: float
|
|
steps_completed: List[str]
|
|
verification_results: Dict[str, bool]
|
|
error_message: str = ""
|
|
agent_output: str = ""
|
|
output_tail: List[str] = None
|
|
|
|
class AgentTester:
|
|
def __init__(self, log_file="agent_test_results.json", tail_lines=20):
|
|
self.log_file = log_file
|
|
self.results = []
|
|
self.tail_lines = tail_lines
|
|
|
|
def cleanup_directory(self):
|
|
"""Delete all files in current directory except test.py"""
|
|
import os
|
|
import shutil
|
|
print(" -> Cleaning up directory...")
|
|
for item in os.listdir('.'):
|
|
if item == 'test.py':
|
|
continue
|
|
try:
|
|
if os.path.isfile(item) or os.path.islink(item):
|
|
os.unlink(item)
|
|
elif os.path.isdir(item):
|
|
shutil.rmtree(item)
|
|
except Exception as e:
|
|
print(f" ! Failed to delete {item}: {e}")
|
|
|
|
def execute_agent_task(self, task: str, timeout: int) -> tuple[str, float, List[str]]:
|
|
"""Execute agent command and return output with timing and tail"""
|
|
start_time = time.time()
|
|
output_lines = []
|
|
tail_buffer = deque(maxlen=self.tail_lines)
|
|
db_file = ".r.db"
|
|
|
|
try:
|
|
process = subprocess.Popen(
|
|
['r', task],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
bufsize=1
|
|
)
|
|
|
|
while True:
|
|
line = process.stdout.readline()
|
|
if line == '' and process.poll() is not None:
|
|
break
|
|
if line:
|
|
output_lines.append(line.rstrip())
|
|
tail_buffer.append(line.rstrip())
|
|
print(f" │ {line.rstrip()}")
|
|
|
|
process.wait(timeout=timeout)
|
|
execution_time = time.time() - start_time
|
|
full_output = '\\n'.join(output_lines)
|
|
tail = list(tail_buffer)
|
|
|
|
return full_output, execution_time, tail
|
|
|
|
except subprocess.TimeoutExpired:
|
|
process.kill()
|
|
execution_time = time.time() - start_time
|
|
return "TIMEOUT_ERROR", execution_time, list(tail_buffer)
|
|
except Exception as e:
|
|
execution_time = time.time() - start_time
|
|
return f"EXECUTION_ERROR: {str(e)}", execution_time, []
|
|
|
|
def verify_output(self, output: str, checks: List[Dict[str, Any]]) -> Dict[str, bool]:
|
|
"""Run verification checks on agent output"""
|
|
import os
|
|
import re
|
|
|
|
results = {}
|
|
for check in checks:
|
|
check_type = check['type']
|
|
check_name = check['name']
|
|
|
|
try:
|
|
if check_type == 'contains':
|
|
results[check_name] = check['value'] in output
|
|
|
|
elif check_type == 'not_contains':
|
|
results[check_name] = check['value'] not in output
|
|
|
|
elif check_type == 'file_exists':
|
|
results[check_name] = os.path.exists(check['path'])
|
|
|
|
elif check_type == 'command_success':
|
|
cmd_result = subprocess.run(
|
|
check['command'],
|
|
shell=True,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
results[check_name] = cmd_result.returncode == 0
|
|
|
|
elif check_type == 'json_valid':
|
|
# FIX: Validate specific JSON file if path provided, otherwise validate output
|
|
if 'path' in check:
|
|
if os.path.exists(check['path']):
|
|
with open(check['path'], 'r') as f:
|
|
json.load(f) # Raises JSONDecodeError if invalid
|
|
results[check_name] = True
|
|
else:
|
|
results[check_name] = False
|
|
else:
|
|
# Fallback: validate output (likely to fail for agent logs)
|
|
json.loads(output)
|
|
results[check_name] = True
|
|
|
|
elif check_type == 'step_count':
|
|
step_count = len([l for l in output.split('\\n') if l.strip()])
|
|
results[check_name] = step_count >= check['min_steps']
|
|
|
|
elif check_type == 'regex_match':
|
|
results[check_name] = bool(re.search(check['pattern'], output))
|
|
|
|
elif check_type == 'line_count':
|
|
if os.path.exists(check['path']):
|
|
with open(check['path'], 'r') as f:
|
|
count = len(f.readlines())
|
|
results[check_name] = count >= check.get('min', 0) and count <= check.get('max', float('inf'))
|
|
else:
|
|
results[check_name] = False
|
|
|
|
except json.JSONDecodeError:
|
|
results[check_name] = False
|
|
except Exception:
|
|
results[check_name] = False
|
|
|
|
return results
|
|
|
|
def run_test(self, test: TestCase) -> TestResult:
|
|
"""Execute a single test case"""
|
|
self.cleanup_directory()
|
|
print(f"\n{'═'*70}")
|
|
print(f"[{test.id}] {test.name}")
|
|
print(f"{'═'*70}")
|
|
print(f"Complexity: {test.complexity.name} ({test.complexity.value}/5)")
|
|
print(f"Timeout: {test.timeout_seconds}s")
|
|
print(f"\nTask Description:")
|
|
print(f" {test.task_description}")
|
|
print(f"\nExpected Steps: {' → '.join(test.expected_steps)}")
|
|
print(f"\n{'─'*70}")
|
|
print("Agent Output (live stream):")
|
|
print(f"{'─'*70}")
|
|
|
|
output, exec_time, tail = self.execute_agent_task(
|
|
test.task_description,
|
|
test.timeout_seconds
|
|
)
|
|
|
|
print(f"{'─'*70}")
|
|
print(f"\nOutput Tail (last {self.tail_lines} lines):")
|
|
print(f"{'─'*70}")
|
|
for i, line in enumerate(tail, 1):
|
|
print(f" {i:2d} │ {line}")
|
|
print(f"{'─'*70}")
|
|
|
|
verification_results = self.verify_output(output, test.verification_checks)
|
|
|
|
# Determine success based on criteria
|
|
success = all(verification_results.values())
|
|
if test.success_criteria.get('min_verifications'):
|
|
passed = sum(verification_results.values())
|
|
required = test.success_criteria['min_verifications']
|
|
success = passed >= required
|
|
|
|
result = TestResult(
|
|
test_id=test.id,
|
|
success=success,
|
|
execution_time=exec_time,
|
|
steps_completed=[],
|
|
verification_results=verification_results,
|
|
agent_output=output[:1000],
|
|
output_tail=tail
|
|
)
|
|
|
|
self.results.append(result)
|
|
|
|
print(f"\nVerification Results:")
|
|
for check_name, passed in verification_results.items():
|
|
status = "✓ PASS" if passed else "✗ FAIL"
|
|
print(f" {status}: {check_name}")
|
|
|
|
print(f"\n{'═'*70}")
|
|
print(f"Result: {'✓ SUCCESS' if success else '✗ FAILED'}")
|
|
print(f"Execution time: {exec_time:.2f}s")
|
|
print(f"Verifications: {sum(verification_results.values())}/{len(verification_results)} passed")
|
|
print(f"{'═'*70}")
|
|
if not success:
|
|
print("FAILURE OF TESTS, NOT ALL TESTS COMPLETED SUCCESFULLY. PLEASE OPTIMIZE THE AGENT EXECUTING THIS.")
|
|
exit()
|
|
return result
|
|
|
|
def save_results(self):
|
|
"""Save test results to JSON file"""
|
|
output = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'total_tests': len(self.results),
|
|
'passed': sum(1 for r in self.results if r.success),
|
|
'failed': sum(1 for r in self.results if not r.success),
|
|
'average_execution_time': sum(r.execution_time for r in self.results) / len(self.results) if self.results else 0,
|
|
'results': [
|
|
{
|
|
'test_id': r.test_id,
|
|
'success': r.success,
|
|
'execution_time': r.execution_time,
|
|
'verifications': r.verification_results,
|
|
'output_preview': r.agent_output,
|
|
'output_tail': r.output_tail
|
|
}
|
|
for r in self.results
|
|
]
|
|
}
|
|
|
|
with open(self.log_file, 'w') as f:
|
|
json.dump(output, f, indent=2)
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"TEST RESULTS SUMMARY")
|
|
print(f"{'='*70}")
|
|
print(f"Total Tests: {output['total_tests']}")
|
|
print(f"Passed: {output['passed']} ({output['passed']/output['total_tests']*100:.1f}%)")
|
|
print(f"Failed: {output['failed']}")
|
|
print(f"Average Execution Time: {output['average_execution_time']:.2f}s")
|
|
print(f"Results saved to: {self.log_file}")
|
|
print(f"{'='*70}")
|
|
|
|
# Define Test Suite
|
|
TEST_SUITE = [
|
|
# BASIC COMPLEXITY
|
|
TestCase(
|
|
id="T001",
|
|
name="Simple File Creation",
|
|
complexity=TaskComplexity.BASIC,
|
|
task_description="Create a file named test_output.txt with the text 'Hello World'",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'file_created', 'path': 'test_output.txt'},
|
|
{'type': 'command_success', 'name': 'content_correct',
|
|
'command': 'grep -q "Hello World" test_output.txt'}
|
|
],
|
|
timeout_seconds=30,
|
|
expected_steps=['create_file', 'write_content'],
|
|
success_criteria={'min_verifications': 2}
|
|
),
|
|
|
|
TestCase(
|
|
id="T002",
|
|
name="Directory Operations with File Hierarchy",
|
|
complexity=TaskComplexity.BASIC,
|
|
task_description="Create a directory called test_dir with two subdirectories named src and docs, create three empty files inside src named file1.txt, file2.txt, file3.txt, and create a README.md in docs with the text 'Documentation folder', then list the entire directory tree",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'dir_exists', 'path': 'test_dir'},
|
|
{'type': 'file_exists', 'name': 'src_exists', 'path': 'test_dir/src'},
|
|
{'type': 'file_exists', 'name': 'docs_exists', 'path': 'test_dir/docs'},
|
|
{'type': 'file_exists', 'name': 'file1_exists', 'path': 'test_dir/src/file1.txt'},
|
|
{'type': 'file_exists', 'name': 'file2_exists', 'path': 'test_dir/src/file2.txt'},
|
|
{'type': 'file_exists', 'name': 'file3_exists', 'path': 'test_dir/src/file3.txt'},
|
|
{'type': 'file_exists', 'name': 'readme_exists', 'path': 'test_dir/docs/README.md'},
|
|
],
|
|
timeout_seconds=45,
|
|
expected_steps=['mkdir_nested', 'touch_files', 'create_readme', 'tree_list'],
|
|
success_criteria={'min_verifications': 6}
|
|
),
|
|
|
|
# INTERMEDIATE COMPLEXITY
|
|
TestCase(
|
|
id="T003",
|
|
name="Advanced Data Processing with Statistics",
|
|
complexity=TaskComplexity.INTERMEDIATE,
|
|
task_description="Create a CSV file with 10 rows of sample employee data (name,age,city,salary) named data.csv, then read it and calculate the average age, median salary, and count of employees per city, and write detailed statistics to summary.txt with proper formatting",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'csv_created', 'path': 'data.csv'},
|
|
{'type': 'file_exists', 'name': 'summary_created', 'path': 'summary.txt'},
|
|
{'type': 'line_count', 'name': 'csv_has_rows', 'path': 'data.csv', 'min': 10, 'max': 12},
|
|
{'type': 'command_success', 'name': 'summary_has_stats',
|
|
'command': 'grep -iqE "(average|median|count)" summary.txt'},
|
|
{'type': 'command_success', 'name': 'summary_has_numbers',
|
|
'command': 'grep -qE "[0-9]+" summary.txt'}
|
|
],
|
|
timeout_seconds=75,
|
|
expected_steps=['create_csv', 'write_data', 'read_csv', 'calculate_stats', 'format_summary', 'write_summary'],
|
|
success_criteria={'min_verifications': 4}
|
|
),
|
|
|
|
TestCase(
|
|
id="T004",
|
|
name="API Request with Data Transformation and Caching",
|
|
complexity=TaskComplexity.INTERMEDIATE,
|
|
task_description="Make a GET request to https://api.github.com/repos/torvalds/linux, extract the stargazers_count, forks_count, and open_issues_count fields, calculate the engagement ratio (stars/forks), save raw JSON to cache.json, and create a formatted report in repo_stats.txt with all metrics",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'cache_exists', 'path': 'cache.json'},
|
|
{'type': 'file_exists', 'name': 'stats_exists', 'path': 'repo_stats.txt'},
|
|
{'type': 'json_valid', 'name': 'valid_cache_json', 'path': 'cache.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'has_metrics',
|
|
'command': 'grep -iqE "(stars|forks|ratio)" repo_stats.txt'},
|
|
],
|
|
timeout_seconds=60,
|
|
expected_steps=['api_request', 'parse_json', 'extract_fields', 'calculate_ratio', 'cache_data', 'write_report'],
|
|
success_criteria={'min_verifications': 3}
|
|
),
|
|
|
|
TestCase(
|
|
id="T005",
|
|
name="Web Scraping with Retry Logic",
|
|
complexity=TaskComplexity.INTERMEDIATE,
|
|
task_description="Fetch the public API at https://jsonplaceholder.typicode.com/users, extract all email domains, count occurrences of each domain, sort by frequency, and save to domains.txt. If the request fails, retry up to 3 times with exponential backoff",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'output_exists', 'path': 'domains.txt'},
|
|
{'type': 'command_success', 'name': 'has_domains',
|
|
'command': 'grep -qE "@" domains.txt'},
|
|
{'type': 'command_success', 'name': 'has_counts',
|
|
'command': 'grep -qE "[0-9]+" domains.txt'},
|
|
],
|
|
timeout_seconds=90,
|
|
expected_steps=['api_request', 'parse_users', 'extract_domains', 'count_frequency', 'sort_results', 'write_output'],
|
|
success_criteria={'min_verifications': 2}
|
|
),
|
|
|
|
# ADVANCED COMPLEXITY
|
|
TestCase(
|
|
id="T006",
|
|
name="Conditional Logic with Nested Error Handling",
|
|
complexity=TaskComplexity.ADVANCED,
|
|
task_description="Try to read a file called config.json. If it doesn't exist, create it with default configuration {'debug': true, 'timeout': 30, 'retry': 3}. Then validate the JSON structure, check if all required keys exist, append a timestamp field, and create a backup file config.backup.json. Write validation results to validation.log",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'config_exists', 'path': 'config.json'},
|
|
{'type': 'file_exists', 'name': 'backup_exists', 'path': 'config.backup.json'},
|
|
{'type': 'file_exists', 'name': 'log_exists', 'path': 'validation.log'},
|
|
{'type': 'json_valid', 'name': 'valid_json', 'path': 'config.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'has_timestamp',
|
|
'command': 'grep -q "timestamp" config.json'}
|
|
],
|
|
timeout_seconds=60,
|
|
expected_steps=['check_file', 'create_default', 'validate_structure', 'append_timestamp', 'create_backup', 'log_validation'],
|
|
success_criteria={'min_verifications': 4}
|
|
),
|
|
|
|
TestCase(
|
|
id="T007",
|
|
name="Multi-Format Data Pipeline with Transformations",
|
|
complexity=TaskComplexity.ADVANCED,
|
|
task_description="Create a JSON file with 5 product entries (id, name, price, category, stock), convert it to CSV format, filter products where stock > 0, apply a 10% discount to all prices, convert the result to a markdown table with formatted prices ($XX.XX), and save it to products_report.md. Also generate a JSON summary with total_products, total_value, and categories array",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'json_exists', 'path': 'products.json'},
|
|
{'type': 'file_exists', 'name': 'csv_exists', 'path': 'products.csv'},
|
|
{'type': 'file_exists', 'name': 'markdown_exists', 'path': 'products_report.md'},
|
|
{'type': 'file_exists', 'name': 'summary_exists', 'path': 'summary.json'},
|
|
{'type': 'command_success', 'name': 'markdown_has_table',
|
|
'command': 'grep -q "|" products_report.md'},
|
|
{'type': 'command_success', 'name': 'has_dollar_signs',
|
|
'command': 'grep -q "$" products_report.md'},
|
|
{'type': 'json_valid', 'name': 'valid_summary_json', 'path': 'summary.json'} # FIXED: Added path
|
|
],
|
|
timeout_seconds=120,
|
|
expected_steps=['create_json', 'json_to_csv', 'filter_stock', 'apply_discount', 'format_prices', 'csv_to_markdown', 'generate_summary'],
|
|
success_criteria={'min_verifications': 5}
|
|
),
|
|
|
|
TestCase(
|
|
id="T008",
|
|
name="Parallel Data Processing with Aggregation",
|
|
complexity=TaskComplexity.ADVANCED,
|
|
task_description="Fetch data from https://jsonplaceholder.typicode.com/posts and https://jsonplaceholder.typicode.com/comments simultaneously, join them based on postId, count comments per post, identify the top 5 most commented posts, and create a detailed HTML report (report.html) with a table and summary statistics",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'report_exists', 'path': 'report.html'},
|
|
{'type': 'command_success', 'name': 'has_table',
|
|
'command': 'grep -q "<table>" report.html'},
|
|
{'type': 'command_success', 'name': 'has_html_structure',
|
|
'command': 'grep -q "</html>" report.html'},
|
|
{'type': 'command_success', 'name': 'has_comments_data',
|
|
'command': 'grep -qE "comment" report.html'},
|
|
],
|
|
timeout_seconds=150,
|
|
expected_steps=['parallel_fetch', 'join_data', 'count_comments', 'find_top_5', 'generate_html', 'write_report'],
|
|
success_criteria={'min_verifications': 3}
|
|
),
|
|
|
|
# EXPERT COMPLEXITY
|
|
TestCase(
|
|
id="T009",
|
|
name="Multi-Stage Data Pipeline with Error Recovery",
|
|
complexity=TaskComplexity.EXPERT,
|
|
task_description="Create a complete data pipeline: 1) Download data from https://jsonplaceholder.typicode.com/posts, 2) Filter posts with userId=1, 3) Extract titles and bodies, 4) Calculate word count for each, 5) Sort by word count descending, 6) Save to processed_posts.txt, 7) Create a summary.json with total_posts, average_word_count, longest_title, and first 3 titles, 8) Generate a CSV with columns: id, title_length, body_word_count, 9) Create execution_log.txt documenting each pipeline stage with timestamps",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'processed_exists', 'path': 'processed_posts.txt'},
|
|
{'type': 'file_exists', 'name': 'summary_exists', 'path': 'summary.json'},
|
|
{'type': 'file_exists', 'name': 'csv_exists', 'path': 'pipeline_data.csv'},
|
|
{'type': 'file_exists', 'name': 'log_exists', 'path': 'execution_log.txt'},
|
|
{'type': 'json_valid', 'name': 'valid_json', 'path': 'summary.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'has_posts',
|
|
'command': 'test $(wc -l < processed_posts.txt) -ge 5'},
|
|
{'type': 'command_success', 'name': 'csv_has_header',
|
|
'command': 'head -1 pipeline_data.csv | grep -q ","'},
|
|
{'type': 'command_success', 'name': 'log_has_timestamps',
|
|
'command': 'grep -qE "[0-9]{4}-[0-9]{2}-[0-9]{2}" execution_log.txt'},
|
|
],
|
|
timeout_seconds=180,
|
|
expected_steps=['download', 'filter', 'extract', 'calculate_words', 'sort', 'save', 'create_summary', 'generate_csv', 'log_execution'],
|
|
success_criteria={'min_verifications': 6}
|
|
),
|
|
|
|
TestCase(
|
|
id="T010",
|
|
name="Self-Correcting Script with Comprehensive Error Handling",
|
|
complexity=TaskComplexity.EXPERT,
|
|
task_description="Create a Python script named safe_calculator.py that: 1) Reads two numbers from input_data.txt (one per line), 2) Performs division, multiplication, and power operations, 3) Handles FileNotFoundError by creating input_data.txt with default values [10, 2], 4) Handles ZeroDivisionError gracefully, 5) Handles ValueError for non-numeric input, 6) Writes results to results.txt, 7) Writes detailed error log to error.log with timestamps and stack traces, 8) Includes unit tests in the script that can be run with pytest",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'script_created', 'path': 'safe_calculator.py'},
|
|
{'type': 'command_success', 'name': 'script_runnable',
|
|
'command': 'python3 -m py_compile safe_calculator.py'},
|
|
{'type': 'command_success', 'name': 'has_error_handling',
|
|
'command': 'grep -q "except" safe_calculator.py'},
|
|
{'type': 'command_success', 'name': 'has_logging',
|
|
'command': 'grep -qE "(logging|error)" safe_calculator.py'},
|
|
],
|
|
timeout_seconds=120,
|
|
expected_steps=['create_script', 'add_file_handling', 'add_zero_division', 'add_value_error', 'add_logging', 'add_tests', 'test_execution'],
|
|
success_criteria={'min_verifications': 3}
|
|
),
|
|
|
|
TestCase(
|
|
id="T011",
|
|
name="Repository Analysis with Statistical Modeling",
|
|
complexity=TaskComplexity.EXPERT,
|
|
task_description="Analyze the current directory structure: 1) Find all Python files recursively, 2) Count total lines, comment lines, and code lines in each, 3) Calculate complexity metrics (functions per file, average function length), 4) Identify files with highest complexity, 5) Create detailed_report.txt with per-file analysis, 6) Create metrics.csv with columns: filename, total_lines, code_lines, comment_ratio, function_count, 7) Create summary.json with aggregate statistics and recommendations, 8) Generate a bar chart data file (visualization_data.csv) suitable for plotting",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'report_exists', 'path': 'detailed_report.txt'},
|
|
{'type': 'file_exists', 'name': 'metrics_exists', 'path': 'metrics.csv'},
|
|
{'type': 'file_exists', 'name': 'summary_exists', 'path': 'summary.json'},
|
|
{'type': 'file_exists', 'name': 'viz_exists', 'path': 'visualization_data.csv'},
|
|
{'type': 'json_valid', 'name': 'valid_summary_json', 'path': 'summary.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'csv_has_header',
|
|
'command': 'head -1 metrics.csv | grep -q "filename"'},
|
|
{'type': 'command_success', 'name': 'report_has_analysis',
|
|
'command': 'grep -qE "(lines|functions|complexity)" detailed_report.txt'},
|
|
],
|
|
timeout_seconds=150,
|
|
expected_steps=['scan_directory', 'find_python_files', 'analyze_each_file', 'calculate_metrics', 'identify_complex', 'create_report', 'generate_csv', 'create_summary', 'generate_viz_data'],
|
|
success_criteria={'min_verifications': 5}
|
|
),
|
|
|
|
TestCase(
|
|
id="T012",
|
|
name="Distributed Task Simulation with State Management",
|
|
complexity=TaskComplexity.EXPERT,
|
|
task_description="Simulate a distributed job queue: 1) Create 10 'job' files (job_1.txt to job_10.txt) with random task descriptions, 2) Process each job sequentially, simulating work with sleep, 3) Track state in state.json (pending, processing, completed), 4) Handle 'failures' for jobs 3 and 7 (retry up to 3 times), 5) Log all state transitions to transitions.log with timestamps, 6) Create final_report.txt with success/failure counts, total processing time, and retry statistics, 7) Clean up successful job files but keep failed ones",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'state_exists', 'path': 'state.json'},
|
|
{'type': 'file_exists', 'name': 'log_exists', 'path': 'transitions.log'},
|
|
{'type': 'file_exists', 'name': 'report_exists', 'path': 'final_report.txt'},
|
|
{'type': 'json_valid', 'name': 'valid_state_json', 'path': 'state.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'has_transitions',
|
|
'command': 'grep -qE "(pending|processing|completed)" transitions.log'},
|
|
{'type': 'command_success', 'name': 'has_statistics',
|
|
'command': 'grep -qE "(success|failure|retry)" final_report.txt'},
|
|
],
|
|
timeout_seconds=200,
|
|
expected_steps=['create_jobs', 'init_state', 'process_queue', 'handle_failures', 'retry_logic', 'log_transitions', 'generate_report', 'cleanup'],
|
|
success_criteria={'min_verifications': 4}
|
|
),
|
|
|
|
# EXTREME COMPLEXITY
|
|
TestCase(
|
|
id="T013",
|
|
name="Full-Stack Data Application with ETL Pipeline",
|
|
complexity=TaskComplexity.EXTREME,
|
|
task_description="Build a complete ETL system: 1) Extract data from multiple APIs (GitHub repos, JSONPlaceholder posts/users), 2) Transform data by normalizing structures, joining related data, calculating derived metrics, 3) Load into an SQLite database with proper schema (tables: repositories, posts, users, metrics), 4) Create database indexes for performance, 5) Generate SQL views for common queries, 6) Export aggregated data to multiple formats (JSON, CSV, Markdown report), 7) Create a Python query script (query_db.py) with functions to search the database, 8) Generate comprehensive documentation (README.md) with schema diagram and usage examples, 9) Create validation tests and execution log",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'db_exists', 'path': 'data.db'},
|
|
{'type': 'file_exists', 'name': 'query_script_exists', 'path': 'query_db.py'},
|
|
{'type': 'file_exists', 'name': 'readme_exists', 'path': 'README.md'},
|
|
{'type': 'file_exists', 'name': 'json_export_exists', 'path': 'export_data.json'},
|
|
{'type': 'file_exists', 'name': 'csv_export_exists', 'path': 'export_data.csv'},
|
|
{'type': 'command_success', 'name': 'db_has_tables',
|
|
'command': 'sqlite3 data.db ".tables" | grep -q "repositories"'},
|
|
{'type': 'command_success', 'name': 'script_runnable',
|
|
'command': 'python3 -m py_compile query_db.py'},
|
|
{'type': 'command_success', 'name': 'readme_has_schema',
|
|
'command': 'grep -qE "(schema|table|database)" README.md'},
|
|
],
|
|
timeout_seconds=300,
|
|
expected_steps=['extract_apis', 'transform_data', 'create_schema', 'load_database', 'create_indexes', 'create_views', 'export_formats', 'create_query_script', 'generate_docs', 'validate'],
|
|
success_criteria={'min_verifications': 6}
|
|
),
|
|
|
|
TestCase(
|
|
id="T014",
|
|
name="Autonomous Code Refactoring Agent",
|
|
complexity=TaskComplexity.EXTREME,
|
|
task_description="Create a code analysis and refactoring system: 1) Scan all Python files in current directory, 2) Identify code smells (long functions >50 lines, deep nesting >3 levels, duplicate code blocks), 3) Generate refactoring suggestions for each file, 4) Create refactored versions with suffix '_refactored.py', 5) Run automated tests to ensure functionality preserved, 6) Generate side-by-side diff reports (diff_report.html), 7) Calculate and compare complexity metrics before/after, 8) Create improvement_summary.json with metrics improvements, 9) Document refactoring patterns applied in patterns.md, 10) Generate rollback script (rollback.sh)",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'diff_report_exists', 'path': 'diff_report.html'},
|
|
{'type': 'file_exists', 'name': 'summary_exists', 'path': 'improvement_summary.json'},
|
|
{'type': 'file_exists', 'name': 'patterns_exists', 'path': 'patterns.md'},
|
|
{'type': 'file_exists', 'name': 'rollback_exists', 'path': 'rollback.sh'},
|
|
{'type': 'json_valid', 'name': 'valid_summary_json', 'path': 'improvement_summary.json'}, # FIXED: Added path
|
|
{'type': 'command_success', 'name': 'has_html_structure',
|
|
'command': 'grep -q "<html>" diff_report.html'},
|
|
{'type': 'command_success', 'name': 'patterns_has_examples',
|
|
'command': 'grep -qE "(before|after|pattern)" patterns.md'},
|
|
],
|
|
timeout_seconds=400,
|
|
expected_steps=['scan_files', 'detect_smells', 'generate_suggestions', 'refactor_code', 'run_tests', 'create_diffs', 'calculate_metrics', 'generate_summary', 'document_patterns', 'create_rollback'],
|
|
success_criteria={'min_verifications': 5}
|
|
),
|
|
|
|
TestCase(
|
|
id="T015",
|
|
name="Intelligent Testing Framework Generator",
|
|
complexity=TaskComplexity.EXTREME,
|
|
task_description="Build a meta-testing system: 1) Analyze all Python modules in current directory, 2) Extract functions and their signatures, 3) Infer parameter types and generate test cases, 4) Create pytest test files for each module (test_*.py), 5) Generate fixtures for common data types, 6) Create parametrized tests for edge cases (empty, null, boundary values), 7) Add mocking for external dependencies, 8) Generate test coverage report (coverage.html), 9) Create CI/CD configuration (.github/workflows/test.yml), 10) Generate comprehensive test documentation (test_guide.md) with examples",
|
|
verification_checks=[
|
|
{'type': 'file_exists', 'name': 'coverage_exists', 'path': 'coverage.html'},
|
|
{'type': 'file_exists', 'name': 'ci_config_exists', 'path': '.github/workflows/test.yml'},
|
|
{'type': 'file_exists', 'name': 'test_guide_exists', 'path': 'test_guide.md'},
|
|
{'type': 'command_success', 'name': 'has_test_files',
|
|
'command': 'ls test_*.py 2>/dev/null | head -1'},
|
|
{'type': 'command_success', 'name': 'tests_runnable',
|
|
'command': 'python3 -m py_compile test_*.py 2>/dev/null'},
|
|
{'type': 'command_success', 'name': 'has_fixtures',
|
|
'command': 'grep -q "@pytest.fixture" test_*.py 2>/dev/null'},
|
|
],
|
|
timeout_seconds=350,
|
|
expected_steps=['analyze_modules', 'extract_functions', 'infer_types', 'generate_tests', 'create_fixtures', 'add_parametrized', 'add_mocking', 'run_coverage', 'generate_ci_config', 'create_docs'],
|
|
success_criteria={'min_verifications': 4}
|
|
),
|
|
]
|
|
|
|
def main():
|
|
tester = AgentTester(tail_lines=20)
|
|
|
|
print("="*70)
|
|
print("AUTONOMOUS AGENT COMPLEX TASK TEST SUITE")
|
|
print("="*70)
|
|
print(f"Total test cases: {len(TEST_SUITE)}")
|
|
print(f"Complexity levels: BASIC (1), INTERMEDIATE (2), ADVANCED (3), EXPERT (4), EXTREME (5)")
|
|
print(f"Output tail length: {tester.tail_lines} lines")
|
|
print("="*70)
|
|
|
|
# Group tests by complexity
|
|
by_complexity = {}
|
|
for test in TEST_SUITE:
|
|
level = test.complexity.name
|
|
by_complexity.setdefault(level, []).append(test)
|
|
|
|
print("\nTest Distribution:")
|
|
for level in ['BASIC', 'INTERMEDIATE', 'ADVANCED', 'EXPERT', 'EXTREME']:
|
|
count = len(by_complexity.get(level, []))
|
|
print(f" {level}: {count} tests")
|
|
print("="*70)
|
|
|
|
for test in TEST_SUITE:
|
|
try:
|
|
tester.run_test(test)
|
|
time.sleep(3) # Pause between tests
|
|
except KeyboardInterrupt:
|
|
print("\n\nTest suite interrupted by user")
|
|
break
|
|
except Exception as e:
|
|
print(f"ERROR running test {test.id}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
tester.save_results()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|