#!/usr/bin/env python3 # retoor import asyncio import aiohttp import ssl import time import statistics import argparse import signal import sys from urllib.parse import urlparse from typing import List, Dict, Any, Optional REQUEST_TIMEOUT_S = 30 MAX_CONNECTIONS = 10000 class Style: RESET = '\033[0m' BOLD = '\033[1m' RED = '\033[31m' GREEN = '\033[32m' YELLOW = '\033[33m' CYAN = '\033[36m' shutdown_requested = False def signal_handler(sig, frame): global shutdown_requested shutdown_requested = True signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) async def fetch(session: aiohttp.ClientSession, semaphore: asyncio.Semaphore, url: str, timeout: aiohttp.ClientTimeout) -> Dict[str, Any]: async with semaphore: start_time = time.monotonic() try: async with session.get(url, timeout=timeout) as response: body_bytes = await response.read() end_time = time.monotonic() header_size = sum(len(k) + len(v) + 4 for k, v in response.raw_headers) + 2 return { "status": response.status, "duration_ms": (end_time - start_time) * 1000, "body_size_bytes": len(body_bytes), "header_size_bytes": header_size, "server_software": response.headers.get("Server"), "failed": response.status >= 400, "error": None } except asyncio.TimeoutError: end_time = time.monotonic() return { "status": None, "duration_ms": (end_time - start_time) * 1000, "body_size_bytes": 0, "header_size_bytes": 0, "server_software": None, "failed": True, "error": f"Request timeout ({REQUEST_TIMEOUT_S}s)" } except aiohttp.ClientError as e: end_time = time.monotonic() return { "status": None, "duration_ms": (end_time - start_time) * 1000, "body_size_bytes": 0, "header_size_bytes": 0, "server_software": None, "failed": True, "error": str(e) } except Exception as e: end_time = time.monotonic() return { "status": None, "duration_ms": (end_time - start_time) * 1000, "body_size_bytes": 0, "header_size_bytes": 0, "server_software": None, "failed": True, "error": str(e) } def format_bytes(bytes_val: int) -> str: if bytes_val < 1024: return f"{bytes_val} bytes" units = ["KB", "MB", "GB", "TB"] value = bytes_val / 1024 for unit in units: if value < 1024: return f"{value:.2f} {unit}" value /= 1024 return f"{value:.2f} TB" def print_summary(results: List[Dict[str, Any]], total_duration_s: float, url: str, total_requests: int, concurrency: int, total_connections: int): success_results = [r for r in results if not r["failed"]] failed_count = len(results) - len(success_results) if not success_results: print(f"{Style.RED}All requests failed. Cannot generate a detailed summary.{Style.RESET}") print(f"Total time: {total_duration_s:.3f} seconds") print(f"Failed requests: {failed_count}") if results and results[0]['error']: print(f"Sample error: {results[0]['error']}") return parsed = urlparse(url) hostname = parsed.hostname or "unknown" port = parsed.port or (443 if parsed.scheme == "https" else 80) path = parsed.path or "/" if parsed.query: path += "?" + parsed.query first_result = success_results[0] doc_length = first_result["body_size_bytes"] request_durations_ms = [r["duration_ms"] for r in success_results] total_html_transferred = sum(r["body_size_bytes"] for r in success_results) total_transferred = sum(r["body_size_bytes"] + r["header_size_bytes"] for r in success_results) req_per_second = total_requests / total_duration_s time_per_req_concurrent = (total_duration_s * 1000) / total_requests time_per_req_mean = (total_duration_s * 1000 * concurrency) / total_requests transfer_rate_kbytes_s = (total_transferred / 1024) / total_duration_s min_time = min(request_durations_ms) mean_time = statistics.mean(request_durations_ms) stdev_time = statistics.stdev(request_durations_ms) if len(request_durations_ms) > 1 else 0 median_time = statistics.median(request_durations_ms) max_time = max(request_durations_ms) sorted_durations = sorted(request_durations_ms) n = len(sorted_durations) percentiles = {} for p in [50, 66, 75, 80, 90, 95, 98, 99]: idx = max(0, int(n * p / 100) - 1) percentiles[p] = sorted_durations[idx] percentiles[100] = max_time y, g, r, c, b, rs = Style.YELLOW, Style.GREEN, Style.RED, Style.CYAN, Style.BOLD, Style.RESET fail_color = g if failed_count == 0 else r print(f"{y}Server Software:{rs} {first_result['server_software'] or 'N/A'}") print(f"{y}Server Hostname:{rs} {hostname}") print(f"{y}Server Port:{rs} {port}\n") print(f"{y}Document Path:{rs} {path}") print(f"{y}Document Length:{rs} {format_bytes(doc_length)}\n") print(f"{y}Concurrency Level:{rs} {concurrency}") print(f"{y}Time taken for tests:{rs} {total_duration_s:.3f} seconds") print(f"{y}Complete requests:{rs} {total_requests}") print(f"{y}Failed requests:{rs} {fail_color}{failed_count}{rs}") print(f"{y}Total connections made:{rs} {total_connections}") print(f"{y}Total transferred:{rs} {format_bytes(total_transferred)}") print(f"{y}HTML transferred:{rs} {format_bytes(total_html_transferred)}") print(f"{y}Requests per second:{rs} {g}{req_per_second:.2f}{rs} [#/sec] (mean)") print(f"{y}Time per request:{rs} {time_per_req_mean:.3f} [ms] (mean)") print(f"{y}Time per request:{rs} {time_per_req_concurrent:.3f} [ms] (mean, across all concurrent requests)") print(f"{y}Transfer rate:{rs} {g}{transfer_rate_kbytes_s:.2f}{rs} [Kbytes/sec] received\n") print(f"{c}{b}Connection Times (ms){rs}") print(f"{c}---------------------{rs}") print(f"{'min:':<10}{min_time:>8.0f}") print(f"{'mean:':<10}{mean_time:>8.0f}") print(f"{'sd:':<10}{stdev_time:>8.1f}") print(f"{'median:':<10}{median_time:>8.0f}") print(f"{'max:':<10}{max_time:>8.0f}\n") print(f"{c}{b}Percentage of the requests served within a certain time (ms){rs}") for p, t in percentiles.items(): print(f" {g}{p:>3}%{rs} {t:.0f}") async def main(url: str, total_requests: int, concurrency: int, keep_alive: bool, insecure: bool): global shutdown_requested parsed = urlparse(url) hostname = parsed.hostname or "unknown" print("abr, a Python-based HTTP benchmark inspired by ApacheBench.") print(f"Benchmarking {hostname} (be patient)...") if insecure and parsed.scheme == "https": print(f"{Style.YELLOW}Warning: SSL certificate verification disabled{Style.RESET}") semaphore = asyncio.Semaphore(concurrency) timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT_S) ssl_context: Optional[ssl.SSLContext] = None if insecure: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector( limit=min(concurrency, MAX_CONNECTIONS), force_close=not keep_alive, ssl=ssl_context if insecure else None ) total_connections = 0 async with aiohttp.ClientSession(connector=connector) as session: tasks = [asyncio.create_task(fetch(session, semaphore, url, timeout)) for _ in range(total_requests)] results = [] completed_count = 0 failed_count = 0 success_count = 0 total_duration_ms = 0 total_bytes_transferred = 0 benchmark_start_time = time.monotonic() for future in asyncio.as_completed(tasks): if shutdown_requested: for task in tasks: if not task.done(): task.cancel() break try: result = await future except asyncio.CancelledError: break results.append(result) completed_count += 1 total_connections += 1 total_bytes_transferred += result["body_size_bytes"] + result["header_size_bytes"] if result["failed"]: failed_count += 1 else: success_count += 1 total_duration_ms += result["duration_ms"] elapsed_time = time.monotonic() - benchmark_start_time req_per_sec = completed_count / elapsed_time if elapsed_time > 0 else 0 avg_latency_ms = total_duration_ms / success_count if success_count > 0 else 0 transfer_rate_kbs = (total_bytes_transferred / 1024) / elapsed_time if elapsed_time > 0 else 0 fail_color = Style.GREEN if failed_count == 0 else Style.RED status_line = ( f"\r{Style.BOLD}Completed: {completed_count}/{total_requests} | " f"Failed: {fail_color}{failed_count}{Style.RESET}{Style.BOLD} | " f"RPS: {Style.GREEN}{req_per_sec:.1f}{Style.RESET}{Style.BOLD} | " f"Avg Latency: {avg_latency_ms:.0f}ms | " f"Rate: {transfer_rate_kbs:.1f} KB/s{Style.RESET}" ) sys.stdout.write(status_line) sys.stdout.flush() benchmark_end_time = time.monotonic() if shutdown_requested: print(f"\n{Style.YELLOW}Shutdown requested, cleaning up...{Style.RESET}") sys.stdout.write("\n\n") print(f"{Style.GREEN}{Style.BOLD}Finished {len(results)} requests{Style.RESET}\n") total_duration = benchmark_end_time - benchmark_start_time print_summary(results, total_duration, url, len(results), concurrency, total_connections) if shutdown_requested: sys.exit(130) if __name__ == "__main__": parser = argparse.ArgumentParser( description="A Python-based HTTP benchmark tool inspired by ApacheBench.", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument('-n', type=int, required=True, help='Total number of requests to perform') parser.add_argument('-c', type=int, required=True, help='Number of concurrent connections') parser.add_argument('-k', action='store_true', help='Enable HTTP Keep-Alive') parser.add_argument('-i', action='store_true', help='Insecure mode (skip SSL certificate verification)') parser.add_argument('url', type=str, help='URL to benchmark') args = parser.parse_args() if args.n <= 0: parser.error("Number of requests (-n) must be positive") if args.c <= 0 or args.c > MAX_CONNECTIONS: parser.error(f"Concurrency (-c) must be between 1 and {MAX_CONNECTIONS}") if args.n < args.c: parser.error("Number of requests (-n) cannot be less than the concurrency level (-c)") asyncio.run(main(url=args.url, total_requests=args.n, concurrency=args.c, keep_alive=args.k, insecure=args.i))