#!/usr/bin/env python3 import asyncio import aiohttp import argparse import json import time import statistics from collections import Counter DEFAULT_URL = "http://127.0.0.1:8083/render" DEFAULT_BODY = { "template": "hello.txt", "context": { "name": "Retoor", "stats": {"notifications": 3}, "repos": [{"name": "snek"}], }, } def percentile(values, p): if not values: return float("nan") s = sorted(values) k = (len(s) - 1) * (p / 100.0) f = int(k) c = min(f + 1, len(s) - 1) if f == c: return s[f] d0 = s[f] * (c - k) d1 = s[c] * (k - f) return d0 + d1 async def worker(name, session, url, payload_bytes, headers, queue, results, failures): while True: idx = await queue.get() if idx is None: queue.task_done() return t0 = time.perf_counter() try: headers['Content-Length'] = str(len(payload_bytes)) async with session.post(url, data=payload_bytes, headers=headers) as resp: body = await resp.read() dt = time.perf_counter() - t0 rt_us = resp.headers.get("X-Render-Time-Us") results.append(( dt, resp.status, len(body), int(rt_us) if rt_us and rt_us.isdigit() else None )) except Exception as e: failures.append(repr(e)) finally: queue.task_done() async def run_fixed_requests(url, total, concurrency, timeout, body): payload_bytes = json.dumps(body, separators=(",", ":")).encode("utf-8") headers = {"Content-Type": "application/json"} conn = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency, ttl_dns_cache=300) to = aiohttp.ClientTimeout(total=timeout) results = [] failures = [] queue = asyncio.Queue() async with aiohttp.ClientSession(connector=conn, timeout=to) as session: # enqueue all requests for i in range(total): queue.put_nowait(i) # sentinels to stop workers for _ in range(concurrency): queue.put_nowait(None) tasks = [ asyncio.create_task(worker(f"w{i}", session, url, payload_bytes, headers, queue, results, failures)) for i in range(concurrency) ] t0 = time.perf_counter() await queue.join() t1 = time.perf_counter() for t in tasks: await t return results, failures, t1 - t0 async def run_duration(url, seconds, concurrency, timeout, body): payload_bytes = json.dumps(body, separators=(",", ":")).encode("utf-8") headers = {"Content-Type": "application/json"} conn = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency, ttl_dns_cache=300) to = aiohttp.ClientTimeout(total=timeout) results = [] failures = [] queue = asyncio.Queue() async with aiohttp.ClientSession(connector=conn, timeout=to) as session: async def feeder(): end = time.perf_counter() + seconds i = 0 while time.perf_counter() < end: queue.put_nowait(i) i += 1 # yield control await asyncio.sleep(0) for _ in range(concurrency): queue.put_nowait(None) tasks = [ asyncio.create_task(worker(f"w{i}", session, url, payload_bytes, headers, queue, results, failures)) for i in range(concurrency) ] t0 = time.perf_counter() await feeder() await queue.join() t1 = time.perf_counter() for t in tasks: await t return results, failures, t1 - t0 def summarize(results, failures, wall): lat = [dt for (dt, status, _, _) in results if status == 200] codes = Counter(status for (_, status, _, _) in results) bytes_total = sum(sz for (_, _, sz, _) in results) rt_us_vals = [rt for (_, status, _, rt) in results if status == 200 and rt is not None] ok = sum(1 for (_, status, _, _) in results if status == 200) tot = ok + len([1 for (_, status, _, _) in results if status != 200]) + len(failures) rps = (len(results) + len(failures)) / wall if wall > 0 else 0.0 print("\n=== Benchmark Summary ===") print(f"Total time : {wall:.3f}s") print(f"Requests sent : {tot}") print(f" 2xx OK : {ok}") print(f" Errors (HTTP) : {sum(v for c,v in codes.items() if c != 200)}") print(f" Failures (I/O) : {len(failures)}") print(f"Throughput : {rps:.1f} req/s") print(f"Transferred : {bytes_total/1024:.1f} KiB") if lat: print("\nLatency (client observed):") print(f" mean : {statistics.mean(lat)*1000:.3f} ms") print(f" median : {statistics.median(lat)*1000:.3f} ms") print(f" p95 : {percentile(lat,95)*1000:.3f} ms") print(f" p99 : {percentile(lat,99)*1000:.3f} ms") if rt_us_vals: print("\nServer render time (X-Render-Time-Us header):") print(f" mean : {statistics.mean(rt_us_vals):.0f} µs") print(f" median : {statistics.median(rt_us_vals):.0f} µs") print(f" p95 : {percentile(rt_us_vals,95):.0f} µs") print(f" p99 : {percentile(rt_us_vals,99):.0f} µs") if codes: print("\nHTTP status codes:") for code, cnt in sorted(codes.items()): print(f" {code}: {cnt}") def main(): ap = argparse.ArgumentParser(description="aiohttp benchmark for /render") ap.add_argument("--url", default=DEFAULT_URL, help="Target URL") ap.add_argument("-c", "--concurrency", type=int, default=32, help="Concurrent workers") group = ap.add_mutually_exclusive_group() group.add_argument("-n", "--requests", type=int, default=1000, help="Total requests (fixed)") group.add_argument("-d", "--duration", type=float, help="Duration in seconds (open loop)") ap.add_argument("-t", "--timeout", type=float, default=10.0, help="Total request timeout (seconds)") ap.add_argument("--print-one", action="store_true", help="Print one sample response body for sanity") args = ap.parse_args() body = DEFAULT_BODY try: if args.duration: results, failures, wall = asyncio.run(run_duration(args.url, args.duration, args.concurrency, args.timeout, body)) else: results, failures, wall = asyncio.run(run_fixed_requests(args.url, args.requests, args.concurrency, args.timeout, body)) except KeyboardInterrupt: print("\nInterrupted.") return # Optional: print one successful response body (sanity) if args.print_one: # do one extra request async def fetch_one(): to = aiohttp.ClientTimeout(total=args.timeout) async with aiohttp.ClientSession(timeout=to) as s: async with s.post(args.url, json=body) as r: print("\n--- Sample response body ---") print(await r.text()) asyncio.run(fetch_one()) summarize(results, failures, wall) if __name__ == "__main__": main()