|
#!/usr/bin/env python3
|
|
import asyncio
|
|
import aiohttp
|
|
import argparse
|
|
import json
|
|
import time
|
|
import statistics
|
|
from collections import Counter
|
|
|
|
DEFAULT_URL = "http://127.0.0.1:8083/render"
|
|
DEFAULT_BODY = {
|
|
"template": "hello.txt",
|
|
"context": {
|
|
"name": "Retoor",
|
|
"stats": {"notifications": 3},
|
|
"repos": [{"name": "snek"}],
|
|
},
|
|
}
|
|
|
|
def percentile(values, p):
|
|
if not values:
|
|
return float("nan")
|
|
s = sorted(values)
|
|
k = (len(s) - 1) * (p / 100.0)
|
|
f = int(k)
|
|
c = min(f + 1, len(s) - 1)
|
|
if f == c:
|
|
return s[f]
|
|
d0 = s[f] * (c - k)
|
|
d1 = s[c] * (k - f)
|
|
return d0 + d1
|
|
|
|
async def worker(name, session, url, payload_bytes, headers, queue, results, failures):
|
|
while True:
|
|
idx = await queue.get()
|
|
if idx is None:
|
|
queue.task_done()
|
|
return
|
|
t0 = time.perf_counter()
|
|
try:
|
|
headers['Content-Length'] = str(len(payload_bytes))
|
|
async with session.post(url, data=payload_bytes, headers=headers) as resp:
|
|
body = await resp.read()
|
|
dt = time.perf_counter() - t0
|
|
rt_us = resp.headers.get("X-Render-Time-Us")
|
|
results.append((
|
|
dt,
|
|
resp.status,
|
|
len(body),
|
|
int(rt_us) if rt_us and rt_us.isdigit() else None
|
|
))
|
|
except Exception as e:
|
|
failures.append(repr(e))
|
|
finally:
|
|
queue.task_done()
|
|
|
|
async def run_fixed_requests(url, total, concurrency, timeout, body):
|
|
payload_bytes = json.dumps(body, separators=(",", ":")).encode("utf-8")
|
|
headers = {"Content-Type": "application/json"}
|
|
conn = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency, ttl_dns_cache=300)
|
|
to = aiohttp.ClientTimeout(total=timeout)
|
|
results = []
|
|
failures = []
|
|
queue = asyncio.Queue()
|
|
|
|
async with aiohttp.ClientSession(connector=conn, timeout=to) as session:
|
|
# enqueue all requests
|
|
for i in range(total):
|
|
queue.put_nowait(i)
|
|
# sentinels to stop workers
|
|
for _ in range(concurrency):
|
|
queue.put_nowait(None)
|
|
|
|
tasks = [
|
|
asyncio.create_task(worker(f"w{i}", session, url, payload_bytes, headers, queue, results, failures))
|
|
for i in range(concurrency)
|
|
]
|
|
t0 = time.perf_counter()
|
|
await queue.join()
|
|
t1 = time.perf_counter()
|
|
for t in tasks:
|
|
await t
|
|
|
|
return results, failures, t1 - t0
|
|
|
|
async def run_duration(url, seconds, concurrency, timeout, body):
|
|
payload_bytes = json.dumps(body, separators=(",", ":")).encode("utf-8")
|
|
headers = {"Content-Type": "application/json"}
|
|
conn = aiohttp.TCPConnector(limit=concurrency, limit_per_host=concurrency, ttl_dns_cache=300)
|
|
to = aiohttp.ClientTimeout(total=timeout)
|
|
results = []
|
|
failures = []
|
|
queue = asyncio.Queue()
|
|
|
|
async with aiohttp.ClientSession(connector=conn, timeout=to) as session:
|
|
async def feeder():
|
|
end = time.perf_counter() + seconds
|
|
i = 0
|
|
while time.perf_counter() < end:
|
|
queue.put_nowait(i)
|
|
i += 1
|
|
# yield control
|
|
await asyncio.sleep(0)
|
|
for _ in range(concurrency):
|
|
queue.put_nowait(None)
|
|
|
|
tasks = [
|
|
asyncio.create_task(worker(f"w{i}", session, url, payload_bytes, headers, queue, results, failures))
|
|
for i in range(concurrency)
|
|
]
|
|
t0 = time.perf_counter()
|
|
await feeder()
|
|
await queue.join()
|
|
t1 = time.perf_counter()
|
|
for t in tasks:
|
|
await t
|
|
|
|
return results, failures, t1 - t0
|
|
|
|
def summarize(results, failures, wall):
|
|
lat = [dt for (dt, status, _, _) in results if status == 200]
|
|
codes = Counter(status for (_, status, _, _) in results)
|
|
bytes_total = sum(sz for (_, _, sz, _) in results)
|
|
rt_us_vals = [rt for (_, status, _, rt) in results if status == 200 and rt is not None]
|
|
|
|
ok = sum(1 for (_, status, _, _) in results if status == 200)
|
|
tot = ok + len([1 for (_, status, _, _) in results if status != 200]) + len(failures)
|
|
rps = (len(results) + len(failures)) / wall if wall > 0 else 0.0
|
|
|
|
print("\n=== Benchmark Summary ===")
|
|
print(f"Total time : {wall:.3f}s")
|
|
print(f"Requests sent : {tot}")
|
|
print(f" 2xx OK : {ok}")
|
|
print(f" Errors (HTTP) : {sum(v for c,v in codes.items() if c != 200)}")
|
|
print(f" Failures (I/O) : {len(failures)}")
|
|
print(f"Throughput : {rps:.1f} req/s")
|
|
print(f"Transferred : {bytes_total/1024:.1f} KiB")
|
|
if lat:
|
|
print("\nLatency (client observed):")
|
|
print(f" mean : {statistics.mean(lat)*1000:.3f} ms")
|
|
print(f" median : {statistics.median(lat)*1000:.3f} ms")
|
|
print(f" p95 : {percentile(lat,95)*1000:.3f} ms")
|
|
print(f" p99 : {percentile(lat,99)*1000:.3f} ms")
|
|
if rt_us_vals:
|
|
print("\nServer render time (X-Render-Time-Us header):")
|
|
print(f" mean : {statistics.mean(rt_us_vals):.0f} µs")
|
|
print(f" median : {statistics.median(rt_us_vals):.0f} µs")
|
|
print(f" p95 : {percentile(rt_us_vals,95):.0f} µs")
|
|
print(f" p99 : {percentile(rt_us_vals,99):.0f} µs")
|
|
|
|
if codes:
|
|
print("\nHTTP status codes:")
|
|
for code, cnt in sorted(codes.items()):
|
|
print(f" {code}: {cnt}")
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="aiohttp benchmark for /render")
|
|
ap.add_argument("--url", default=DEFAULT_URL, help="Target URL")
|
|
ap.add_argument("-c", "--concurrency", type=int, default=32, help="Concurrent workers")
|
|
group = ap.add_mutually_exclusive_group()
|
|
group.add_argument("-n", "--requests", type=int, default=1000, help="Total requests (fixed)")
|
|
group.add_argument("-d", "--duration", type=float, help="Duration in seconds (open loop)")
|
|
ap.add_argument("-t", "--timeout", type=float, default=10.0, help="Total request timeout (seconds)")
|
|
ap.add_argument("--print-one", action="store_true", help="Print one sample response body for sanity")
|
|
args = ap.parse_args()
|
|
|
|
body = DEFAULT_BODY
|
|
|
|
try:
|
|
if args.duration:
|
|
results, failures, wall = asyncio.run(run_duration(args.url, args.duration, args.concurrency, args.timeout, body))
|
|
else:
|
|
results, failures, wall = asyncio.run(run_fixed_requests(args.url, args.requests, args.concurrency, args.timeout, body))
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted.")
|
|
return
|
|
|
|
# Optional: print one successful response body (sanity)
|
|
if args.print_one:
|
|
# do one extra request
|
|
async def fetch_one():
|
|
to = aiohttp.ClientTimeout(total=args.timeout)
|
|
async with aiohttp.ClientSession(timeout=to) as s:
|
|
async with s.post(args.url, json=body) as r:
|
|
print("\n--- Sample response body ---")
|
|
print(await r.text())
|
|
asyncio.run(fetch_one())
|
|
|
|
summarize(results, failures, wall)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|