import asyncio
import time
import random
import statistics
from collections import deque
import websockets
# --- Test Configuration ---
HOST = "127.0.0.1"
PORT = 8080
URI = f"ws://{HOST}:{PORT}"
# Client setup
NUM_SUBSCRIBERS = 1000
NUM_PUBLISHERS = 10
CHANNELS = ["news", "sports", "tech", "finance", "weather"]
# Test execution
TEST_DURATION_S = 15
MESSAGES_PER_SECOND_PER_PUBLISHER = 100 # Increased message rate
# --- Global State & Metrics ---
latencies = deque()
messages_sent = 0
messages_received = 0
subscriber_setup_count = 0
all_subscribed_event = asyncio.Event()
async def subscriber_client(client_id: int):
global subscriber_setup_count, messages_received
channel = random.choice(CHANNELS)
try:
async with websockets.connect(URI) as websocket:
await websocket.send(f"sub {channel}")
subscriber_setup_count += 1
if subscriber_setup_count == NUM_SUBSCRIBERS:
print("✅ All subscribers are connected and subscribed. Starting publishers...")
all_subscribed_event.set()
while True:
message = await websocket.recv()
try:
sent_time_str = message.split(":", 1)[0]
sent_time = float(sent_time_str)
latency = time.time() - sent_time
latencies.append(latency)
messages_received += 1
except (ValueError, IndexError):
print(f"Warning: Received malformed message: {message}")
except (websockets.exceptions.ConnectionClosedError, ConnectionRefusedError) as e:
print(f"Subscriber {client_id} disconnected: {e}")
except asyncio.CancelledError:
pass
except Exception as e:
print(f"An unexpected error occurred in subscriber {client_id}: {e}")
async def publisher_client(client_id: int):
global messages_sent
await all_subscribed_event.wait()
sleep_interval = 1.0 / MESSAGES_PER_SECOND_PER_PUBLISHER
try:
async with websockets.connect(URI) as websocket:
while True:
channel = random.choice(CHANNELS)
send_time = time.time()
message = f"{send_time:.6f}:Hello from publisher {client_id} on channel {channel}"
await websocket.send(f"pub {channel} {message}")
messages_sent += 1
await asyncio.sleep(sleep_interval)
except (websockets.exceptions.ConnectionClosedError, ConnectionRefusedError) as e:
print(f"Publisher {client_id} disconnected: {e}")
except asyncio.CancelledError:
pass
except Exception as e:
print(f"An unexpected error occurred in publisher {client_id}: {e}")
def print_report():
print("\n" + "="*80)
print("PERFORMANCE REPORT".center(80))
print("="*80)
if not latencies:
print("No messages were received. Cannot generate a report. Is the server running?")
return
total_sent = messages_sent
total_received = messages_received
message_loss = max(0, total_sent - total_received)
loss_rate = (message_loss / total_sent * 100) if total_sent > 0 else 0
throughput = total_received / TEST_DURATION_S
print(f"Test Duration: {TEST_DURATION_S} seconds")
print(f"Total Messages Sent: {total_sent}")
print(f"Total Messages Rcvd: {total_received}")
print(f"Message Loss: {message_loss} ({loss_rate:.2f}%)")
print(f"Actual Throughput: {throughput:.2f} msg/sec")
print("-"*80)
sorted_latencies = sorted(latencies)
avg_latency_ms = statistics.mean(sorted_latencies) * 1000
min_latency_ms = sorted_latencies[0] * 1000
max_latency_ms = sorted_latencies[-1] * 1000
p50_latency_ms = statistics.median(sorted_latencies) * 1000
p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)] * 1000
p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)] * 1000
print("Latency Statistics (ms):")
print(f" Average: {avg_latency_ms:.4f} ms")
print(f" Min: {min_latency_ms:.4f} ms")
print(f" Max: {max_latency_ms:.4f} ms")
print(f" Median (p50): {p50_latency_ms:.4f} ms")
print(f" 95th Percentile: {p95_latency_ms:.4f} ms")
print(f" 99th Percentile: {p99_latency_ms:.4f} ms")
print("="*80)
async def main():
print("Starting WebSocket Pub/Sub Load Test...")
print(f"Simulating {NUM_SUBSCRIBERS} subscribers and {NUM_PUBLISHERS} publishers.")
print(f"Publishing at ~{NUM_PUBLISHERS * MESSAGES_PER_SECOND_PER_PUBLISHER} msg/sec for {TEST_DURATION_S} seconds.")
print("-"*80)
subscriber_tasks = [asyncio.create_task(subscriber_client(i)) for i in range(NUM_SUBSCRIBERS)]
publisher_tasks = [asyncio.create_task(publisher_client(i)) for i in range(NUM_PUBLISHERS)]
all_tasks = subscriber_tasks + publisher_tasks
try:
await asyncio.sleep(TEST_DURATION_S)
finally:
print("\nTest duration finished. Shutting down clients...")
for task in all_tasks:
task.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)
print_report()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nTest interrupted by user.")