From 035df8254b5f9dc922185df5f61d5c61da7f1a6f Mon Sep 17 00:00:00 2001 From: retoor Date: Sat, 27 Sep 2025 00:33:35 +0200 Subject: [PATCH] Initial commit --- Makefile | 10 + load_test | Bin 0 -> 22288 bytes load_test.c | 518 +++++++++++++++++++++++++++++++++++ rpubsub | Bin 0 -> 35336 bytes rpubsub.c | 766 ++++++++++++++++++++++++++++++++++++++++++++++++++++ test.py | 148 ++++++++++ 6 files changed, 1442 insertions(+) create mode 100644 Makefile create mode 100755 load_test create mode 100644 load_test.c create mode 100755 rpubsub create mode 100644 rpubsub.c create mode 100644 test.py diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..df02419 --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ + +build: + gcc -D_GNU_SOURCE -O3 -march=native -mtune=native -flto -pthread rpubsub.c -o rpubsub -lrt -std=c11 + gcc -O3 -march=native -std=c11 -o load_test load_test.c -lm -lrt + +clean: + rm -f rpubsub load_test + + + diff --git a/load_test b/load_test new file mode 100755 index 0000000000000000000000000000000000000000..2e5cd72305b541923a0967f0b3ec80075ba33384 GIT binary patch literal 22288 zcmeHvdw5huw(ssdFhJ;TG)P30tu`|WN=SH_Ab~XLz>ahvLJ&~Krb%~5j(OSL;o*y5 zC&<1{Yp#zO=QwwU(R1fGPn|OkBOC`iFF@3pz>FRpUjYTX!AHP37(nRrTeYjYcenI> z=eu|QxZiiDpnFyQYOPgOtE$$ns@|zB750T`27@F*x-?IslsrL0%6LZgQXT;*lZvG= z_?{$PD~&)pgJD{JnMRPSg}2iZX%@%RK}jx`BA1{OHCPf<)Q~92r3x>M(Q*Wp%vze{ zGAYW_$+T&b#wV!AkCj)b-4sSFCskNI5xC*@Uhx%Szbo@Dx>21|m9 z@@|71+2y|*L^m(v<)!KencGWHZdPepW5c@HGp9AWrZzS-w{4obsd)C(*)t2gEd|qA z6{ub^J~XBlEmDe%`*;7_Np z&tFsUUk?6DwfE{2{AvokCYGJM2xt-;x6VSqgkr3jBHC zG*6PlGXTa)6Qs4(T8{9KlDoB~vC&cMYm{2u9#4x$s%>nkz1y+g?ejG>xh03g>vJMq zTYtBs&e_lipw-jR?5iV0s`I$rQeAB`q?_DLO)VSTlGo?)wYVfNl(>D8$L)0SA~rf3 ze3Xm-lGokrf~Gp1swQV63I+(rR7-_=e2yk(Lo-BNsF>vSIGIfCCa0sWq1oBkaIafb z9;Nu)MH1>v#kREis1aVbRNKggoMGX_38hLAhgkK}jrDG&2^R0hcKl;%L#$F>pp4onA024j0Un9F;Xy z4z$m`zQOBrdupo68(W&)HO_U7Wcu|@EzR5%4vqDajO0Pn@pMdw*Gy;sGT}QZb{S6_ z@I2IV(qdQ|xJPo7^0Yvl6ESwh^X00B2CO(nO5ftx6aVwOaLZXzAy0d*Y8WX&=Pf*) z!}v!?i+K8bj6YMlji+B^=?rNNPq$4V`E+SLPy3j@G|7uJ1+lJ(jaPCE2w}c%5O|2M z=dzN4PxIzI$Zzo8`k01>G;JwC*68~FuX)#x(tMySOd5o!qYT_eP`)#dVY&w*5T|q z%`aK3bAxnjLfbu{Yu zSLygoI()4TH|y|v9jR~w7{39 zYtJeEPqUOj<~#G@4mv}=wCI51@5}0C#YAWIU~3Sa+=p+|q%w*4>nSI4G#ZW8b+I%# z%E;k(njB=LC!Qw99{IO;njB^1nRuFJvkMwZ3X z5H-Ul7ovJji>2B6geDElcS6D z#M9*9BL5aolVgiK6Hk*vi#!%jlOu~f6i<@_i+JK`a$J#h@iaNC$g+5v993jNJWUQN zGCh`#PQF+wNp+@4Hi`9Tbn@poZP2HWC#63~O23tq{zFoFUs8H+QhHZX`d3NmpCqM! zn3Vo*Qu>jk^tPn*gGuQPN$KXKbX`*VuB7zsN$Dj?>4iz@c}eNnN$G;5^tY1I*CwT} zjHQ)eY2`#Li#jk`BTF&R0Q_$}c*0%)N}%*baE8B)u2%y0TXMtKAQmXyNFa^3m)Oib zj`;^M2Z_zq#2z4yp$o)(D@EO4K6waCHxbi#jcFC-yauL|zeA3JaxM_xPwNUylR7Zw zB#HS4>q_hG))m&870Z>apVG=%@n>74zY$*r2T~MuDS?d1c~GLL9TsIepisC9`qlm6 z5g@Nf_3Fs59Wh1CeB&0db%snGtl7a?Z{k_{3b`T3lz?S7xr9AoI3guDE3C_$NSQRh zMdo>3W)y7$f1wWg6i`t#TmdmPa~ewZ4@XTMA0he5+R_VB_+=7NZI)bCo1Xwzf;E;J zXn6{=l2vKH%~G8vP3j7_g4Z5!S#m3iKC=gXmTD!~XeqO+ADJdeMThrN-!OsjWl*z+ zj_mf5=d`LB7Ii-w*7Y^&60&Z+@Cn*wgF+?2}^DndT%KzWDVq40;MbFkl7=w4p3Z%5z=l~T^3vTZWPMe z{5Ua3`zyEh6+#xZIE`AEI?HKnsGk=_?Ln7iZTRc+(P;mxE6*rFn`N$|uCd5{M_!v^ zvnXUFGsW$evi|dTW&RvS?mxe(8=tAqv-sGXEOlK)q%6FPv>aLaZvT0+#nN}=BT_(w z%eDyFv(pAm+a5u`2TC6=ktD6xsa|&Vc-V@XYc;D4hh5O91kf@&<^`)vRz>G$53XGt zv^7ap>hY@JO&uq%uIqX(!_+y9o_Nk+>Kp{E^zX~AD*4=Zb7kPJ3~OLSV0qSaxu(u1 zk=1$FcZk?_6MKg5O-22XxWlXQc!5aU8}$4PlwK?bE5-~xfBL&uvRYsSoayWCowC30 zNG}?dWe+~r{_p2tkGnD-$7&HadA0Az*#}=cnPZLatK8bpEbhNJ%GB{5hKAMuKvqWE zu}c5%sTZZTcfs3yvafsdv|myudtPJq+f7edd+-OA&@^;9IC{TtS3j&!Pwl0R&R1uZ zV5SLGvj;QtFq3x^sMH>|2XY3g_s~pYzQVf9uGUzJ?dZtdaOfO_m8k`vkvIp&TkgHPtlRLoG=hY8B6Z4In|k>0!+t^U)h z%r{Y$O0|>PMZ*z2Dbe)NhGV0prmy?Hv7OXNC7QV#GU3;u`;OI-H(?g`eDj~nG<7UO zDsV-hbPG6&UfoNr2|o+Qz%Ah&Ap6U+$TkrfZ0NLbE;u;E7}wx|A^V2F)`+l6qrnA<xi|sO*_xR@o0hcx9EXy)+1vlK0zQhbzb=ugX09E#QIQe;Lx@ zX(U#vQuJRCEea`tCzvO7DL3XY*V=ZuvbAp&eCwUxv97Ux$9k95(HmX`Ug5xLeOL5Z zrF!TtMLnwcKOU;CsVE8+y$Xka-8qclagWcEq`$@;KJZgOFqlpdDH-;S!_smC#j&&; z86D*9?Bx{v-Hd_9=%h`f{M4f-A_14r=QZAy4&dFa2N0 zmfCJyq4-O$r>LSH^JT*d-$Eih@~dbx>_T1lWws%0_zq+xy?kUNGI@PSAH0kSpAjzs zj`p9O*1d6LQ3&*a>3gBxx_Dc~dUyqjv(_wE)blXVAf9VGG2r_zw2X610KM2^On8z# z#byKGg|aQM-rRSzO*$CZm|+Vn&FVY)fYj4}u4TL}uqeC#!j^0czHAn`Lj6by-fyYK zlz#B=$vi68R&-E_hKf%2eRSW@nY7>&V}E?|o#qO)w?aK&RZrQ}x2)>nzTW$W4qmPJ zhfNz+1!^81v>D!+QS*;r*8c@PS( zJ6&ZDKuqBKn;*L4B;1}2 zzaJ&n_pR=A2T@M7sdFK-a`OlMXPb{!82b8po!568`g&Jif6!o4Yt8+q8e#m4u*;To zCD36Z`(gZQOM|xT{^5;E&^>ggb_08_1>C3T4$_*^a2`tSu^pu$%+o!I~QBU|U8vO4> zyH8xx9ZokK^bgKgSLWJ+^%H|NnNp?Ug8$t7jgzpI%2iL;)i3bajZdL+-5X4~;( zydUlbUL5JXU+p>cr{qmxBb0k{?L|^u;*C*`;~?dA~Yeu zR~fjnGT4??g<0o6K3u7;kQN7O<_#}aYf6VJ^Z&CdKWsI;Z}lG=w)!t*dA3rl=&*Xo z-;-4tn46`jbB#8X`8iitCRG_O1{?E2kxZz!2ClS1VL23jT@Hm`N9F-7H~p?=-o-is zE=HyT(Dhc_{>=1>dI7WQOR~Lg3SpfCPi86}T9Ol3m=m}?S5dPqYPY{TOI?y<^3TEe zi1w(r=k6=Nv)vx7od{P;4c_T9n#gxUiWsLN5_GcbI#oJu37kCLij^J7?E z*wwEtSd-NE|icg6;8glWW22Y3ihY@O3_kT5D^?x$|x%sA!D(WBqehj5A*n_>& z@L{AH82HqluLk@_8P{$+0d2u`6JfXiz#^G*{x$M@;Nhav(I{KruTX-wr;!h2t9%S* z-f7hdou;7EUi$gZv;gmz=^V$rqb<$Pm6q4yy~lR!aWjy#d7;P(nrymG{tQm{C#B(PJ`t@=@`e+bf!` zsBbG@9amHwvK+mr911J>Z^OBp(-n0%5jXS@lt18`Sef6i?yEGsfiY{=#;kRpZJbn! zfpmJozKmbPvtdCT9|0DOfNDiO0H;XL^zZb>{UIt=6pfsr{<~+0_aEk0Ucz<@s}gh; z=FB{>^bf7X;`2)^xwDAinS@p81?)6wwa!*B@q#l^a9(eCJj8U*Q_`O>6qxoJi&;Gv zC_OntyHUkG zbWFt-leD#>{Z$$UqVL;9p~_%(j_7wdilgbKT{Srw(=lhaV{BsDmh`qA$ByXQ3?)#* zrY>KNmB4;xn!2(2y|fiwRc9(bhk)54#j4a`ZfouDW;<76QG?B~p%lRe_T)BkC4 zLR<2%49B+2tOT(Htx}KGb%mdX#>!~__q$Kz8U{?eLiq#M?G@2VtSdj>tL+v)36E#4 z^Y^q?w+F}k5yxq}CB&<-SGXT*Wo?yDh@$ouh3dPo9ScAE8LVBAg}EnTMNhq(qdXNT z{rX1Oj4$JJXc@1F)qNv&21Ybu)g9FL7%<`ye>sd&45z5Mw8$M$9R4l7m|vszYJHAT zCMPsDR|z~Um!VRD@3H+u@Wsoh1%dfWeowdq3do6U??nZbbU$NMHFAre_`R0IF=*=i zlvw?J@}0W*)PMGi5UtiNXcrs8SW+-?(a3CA0F7K@F^3CLW}rOFKWs2{JON$);qWZB4Sa4Yfw0EU->OZRjZ>s@Qj z9_nm+aD6t8bg=5(`soXh(HXzP@*Dt6oWSqs9D+=K+HQb9fkLwSBjnbxb%x<7Ur`zH zpG)&i^PkHwb?ikpt@TJtY#^9A>18qOSEw*PN$5DI2FJg&&gs^a`}P5=;MgyDfgLZR zRRM2?vY(X%1ybi3Q%5t5$W2e48qZ0eF+B35X35T-5Y{Z&$}KrBKW@o&V1p$Wg9uAj zf{oAn!19dF!=}!gft;KIKcxg7rp=*5wg1bz#-!nNy#LEp2m8NVa{xtT_h9Qi8w`=FP_-^JAld(g>jO z7#NLh?C-vXsszVvzyn7+yx0Zh+Q@tht~ugFG2v&zC`&tU#e!_sZ=jYt4R-PxyrvR! zf|kxQhr!(jJ7{LC>%ENs}PEHB!p<`C4p%)r>fmlk6>%rrM4!7P6 zcTsVHai=CTw&!6>QLhbJ&L6*-vj#bK^=M!B+2(%eo+Wg%rD}K{a_LlwvKAy}-9Rce z=OIclvr=Ylrcwf>_p-vqoD@&ou;CM`-k3WG?|jwNaTbkJf>vO&mJsG2PBU$b5~v=; zDMtU(fIsLF3j#B>U9S6xYptdWqvOVA(!J=3Zq8-15$yqzJSs%8s%4Yu3LhCd$O>27C zeE=^=Go2G9wNvWD7BOF8(uK?6H`K6FAE*+YlUJs3-&$ZF(j{;L1{ zn#?kiWQI5G8J-_0A^%QzULxMhoIPJZP<~coeuX}tmyO=W{*IOXE-%|jY+XDb{U!4M z#Pffo&&L=R`5*E8Uh=g<9zI9ppPYh6)v8w`V|e}{MSV+ad&Ga-po~04&8MFoLS#mj zw!C)rh}HkGfjpOTC}JOZg!~sCe$etPvS}Sevpez-%q8YE(nW{akF<1Fh|D?2)b@K_{);+I=Kpts0dF(4OGVRfE-1vmBKAU}ti9+sX4IHbl3Y|+Bufh3E%eqq?{>@d zj-%Tp*R^=$I+r}zHA?cft&=BvrODn=Qca7`*(g`Jyfw^v^7ZuZTQCpWfzBWGFd z1{cq<}sE-~nHOH$`*Z)wVntWgbaZ_*o> zJiE5B0h(i(zQ#GS)8&#{nI*}X&F+m}$=izeFTIk_U0W~F8=cLyZfPUlNYm z9=FV2EyYm5(23`|3aFz!zJ})YGMd}i;6-b^1qB6oys$8;^zR(FVj3N(Lv_Wng-e%J zS(lVo$jd6KmoBSG%}Y7hq5YkMX#bKHxk=2Qjd<@>_PA@^4IA7rT)DHkxy2{1cQ<2J z_}sD+c{D@{pHDjlhGq?^pFelhzCAt zWqXU)8=E(nx^mv+g6VaWC+BlcYpt&hbG)YB)6%xSzO@a5jV8x*nw0COK`ZB*`gabZ zqOs8v8?JJ7+q!AXF?H-MPM1vW)eR?Uc|%hhW+RO^YPEipVq5~!jY28|%4kp$ztbl_ z2nOCR=JMi(69aZ~!R)#@iW~1{%XOZXrnoxN-O?=A)}z!WAvzzStjF{?Buj{LAkAB;UcB-*2VU! zB}=PsTeiGr#qBFst-iy#uGZzQTVLPsPj@#qHMg|hgE!CHHf-Fq`QE~!=`&`|ntgL| z$&J$tH&m=wfBwc77vJ0S)P|br=Ak2pdqox8Z@gjE;;YLvTANbme88XW+C*T=xY7XqyTU5gdq{h(c-U7)nCe~saJGP)Tpo5^T zpk>(S?;srXInZjHvAjn(PNOU+=LKv56wvA~qtT~8DTsGd4figS44ce`tFtf5qOu6b zPrcHCqtPgBnsCx22Vj8Bx9?-Jg-Z)i4T` zwT@=*updD7d$8LLpcE8*-os}Y7<gnv)mXS~#6n}S z|1z7gdh;k_aiy`)YRp?;l)<>bXkK8H62bW#7^gnoFQ>JirkR=qA8=~`s0PA@b- z8`LLhTR1uo;y&o;!nWjO;(2c)?-$4`ro7vXW`A0pF$>W`eEy2O0pz_wd1%XyG@CJZ zTe{6C`!g1yhzCa*WvemQDhyx)8;!R{?D9`?nIPG6N4nK0Z_BV5^Zc1M0cH_B+X8nYIUX7zglJZmwA ze@J7L)$dlSAIkD)+-e*!q&4X)Xk&Id0GXFD@9?V_#^iv(pRp~S<^r|M~Q4|W-K$iP^AiOcdal+^E5t>0gg?Z%hN@L6j& zcDd7cq#K`d+l@QjY5#(Vm7wXD9v9ZpG}m^ZtdVF#2jUXA{g-VUu_H5>@ksikXu&_i z|JN3{T5G-d`xlx%6ejWrj@Grnoh^ROU|}s@lA>^xMq+ za`@ehr9b8h&hoU-$JPJci1@qdWQB#izsoqS=5#Hmt(>-V+QsQkPG8_O#OVO1gPaa= zia#o3fnGbHAah#CX&I;0oUY}xmD6@kyExs+=?k2OI33`0kW;O+|Ngi5JFsL4@poY2 z@4lWX(m2H5b&0>@T2x*>N6uTZuC3YECKt^pm{CwTb#@y|PJeLv%!0z1`5aCzAu&zE zE<-wnlas_i!^wag^7RyC_AN>DYW=17@e`z!CYhxWF(3%D6e3ReA&#?W2CzB!TuM&` z=dTtM9cc=S@F6|zN%$Qc7ygELDYW8akS0j=)hvm)7Vc;Gm9f9qkC!Gj6XN#e*|ts1MwEk zH$r|2oZi~G9PKS+i)Kk;YZyo`Ou=890&nMf#2@vs^&j-m+FbrdZ6xUe{G?1?p7)Dj z`32J>QgKSJfKolzUaw(n4TmrF`2Q&deqRdw5r&&3=>|Y!9_ ziuqf{^^5T%?7uvP9%*^4$5&L)g4+R{>lz$B=X$9T=YkF&oeS1VIE`~Px*c@wLP2dy zqf4SI6to!RZ~%taU23KHg#!*>lLL1pG~*=S;c&G$);G4Sb2d6$z7~(y;cVL^)wVRX zHoAQ{!7IEp2VKU{;Bb08&dm-S`g=CxQV3_0+u>?!YT67DUCIGYUjnPcv2dBSs=`sR z#OA;a2e#EqtW}lekizvajtYh6Dz;^kW08I70;}Dzbm79~6*Z0;>jHZPBoZ!mC=<6Z z%)reGbTvf`plcU&rh#12EfX z%QhTMxFdsIQUUYRk0t!xj0A3Wu|@&{iTK?du^e1ALEVI(C3vmSnQoWU=ak~V(}+WN z3=QqPU1A+a7EHL`Bc^nw#BR(0Kb^s|8#`ioWC&LI1SHnD7)m!|&`^v4B*-rENI>HE zb|k>D$~$UXurTOy{g z{!W(|7d3%Tzm1*-vJK6eOWtL|2a$H8OH4rtY^A^WC6+<$WNw9&0{m9xE?D2(R?zBc!PPmw%{pjZ zTLW&^YH)GLx}b6@d1}U9@ATG71+LA_C_TF9bXG%!#2gNnY&N1(g%ozahU8`Dbt@B)(eDRFL=Tz@Cz>1a#H zzs>6}DE*E@fow!}7V_eJtpOOZ2zi+wkO?Zz<-o!M>ClIJkf1X!AurCk1YO9PMg0Yz zpbsLS&d@|!oTCZa#pRRh&tWNu2=$4O7w39{igP_tescT2$K@Arh2or0(4AZ{$&<~J z<$nx}Y$Ei>){A`Im~@=+jsE{ZlDy321ZBUSfky{$zmO!4 zMXVM$4JCEB&?o5bBzbZEB7yC>>+hZN0 zX^@2?PHDeLO%(FtyhqR%llYU%e>X|K7=Ln0LC|0D`KO`r@gvg55TWZg1e>_uO5EQl z#+%?5X+cjR!=#fJ=knpXnuOp>C4T`KWFk?%I0uW}$I1JjyM@^J7ybBi$P$Z?7w7VW z^8g~qN#jSaIy>-vhkib=ddmgD6*E TG~F*HKb!wPG9gJIDJlIIX?F+c literal 0 HcmV?d00001 diff --git a/load_test.c b/load_test.c new file mode 100644 index 0000000..ebbfde7 --- /dev/null +++ b/load_test.c @@ -0,0 +1,518 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// --- Test Configuration --- +#define HOST "127.0.0.1" +#define PORT 8080 +#define NUM_SUBSCRIBERS 1000 +#define NUM_PUBLISHERS 10 +#define TOTAL_CLIENTS (NUM_SUBSCRIBERS + NUM_PUBLISHERS) +#define TEST_DURATION_S 15 +#define MESSAGES_PER_SECOND_PER_PUBLISHER 100 + +// --- Internal Configuration --- +#define MAX_EVENTS TOTAL_CLIENTS +#define RW_BUFFER_SIZE 8192 +#define MAX_LATENCIES 20000000 // Pre-allocate for ~1.3M messages/sec + +// --- WebSocket Constants --- +#define WEBSOCKET_KEY_MAGIC "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + +// --- Helper Enums --- +typedef enum { + CLIENT_SUBSCRIBER, + CLIENT_PUBLISHER +} ClientType; + +typedef enum { + STATE_CONNECTING, + STATE_HANDSHAKE_SEND, + STATE_HANDSHAKE_RECV, + STATE_SUBSCRIBING, + STATE_RUNNING, + STATE_CLOSED +} ClientState; + +// --- Client State Structure --- +typedef struct { + int fd; + ClientType type; + ClientState state; + char read_buf[RW_BUFFER_SIZE]; + size_t read_len; + char write_buf[RW_BUFFER_SIZE]; + size_t write_len; + size_t write_pos; + double next_send_time; +} Client; + +// --- Global State & Metrics --- +double* latencies; +size_t latencies_count = 0; +uint64_t messages_sent = 0; +uint64_t messages_received = 0; +int subscriber_setup_count = 0; +int all_subscribed = 0; +int epoll_fd; + +const char* CHANNELS[] = {"news", "sports", "tech", "finance", "weather"}; +const int NUM_CHANNELS = sizeof(CHANNELS) / sizeof(CHANNELS[0]); + + +// ============================================================================= +// START: Clean SHA-1 and Base64 Implementations +// ============================================================================= + +// --- SHA-1 Implementation --- +typedef struct { + uint32_t state[5]; + uint32_t count[2]; + unsigned char buffer[64]; +} SHA1_CTX; + +#define SHA1_ROTLEFT(n,c) (((n) << (c)) | ((n) >> (32 - (c)))) + +void SHA1_Transform(uint32_t state[5], const unsigned char buffer[64]) { + uint32_t a, b, c, d, e; + uint32_t block[16]; + memcpy(block, buffer, 64); + for(int i = 0; i < 16; i++) { + uint8_t *p = (uint8_t*)&block[i]; + block[i] = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3]; + } + + a = state[0]; b = state[1]; c = state[2]; d = state[3]; e = state[4]; + + uint32_t W[80]; + for (int t = 0; t < 80; t++) { + if (t < 16) { + W[t] = block[t]; + } else { + W[t] = SHA1_ROTLEFT(W[t - 3] ^ W[t - 8] ^ W[t - 14] ^ W[t - 16], 1); + } + uint32_t temp = SHA1_ROTLEFT(a, 5) + e + W[t]; + if (t < 20) temp += ((b & c) | (~b & d)) + 0x5A827999; + else if (t < 40) temp += (b ^ c ^ d) + 0x6ED9EBA1; + else if (t < 60) temp += ((b & c) | (b & d) | (c & d)) + 0x8F1BBCDC; + else temp += (b ^ c ^ d) + 0xCA62C1D6; + e = d; d = c; c = SHA1_ROTLEFT(b, 30); b = a; a = temp; + } + state[0] += a; state[1] += b; state[2] += c; state[3] += d; state[4] += e; +} + +void SHA1_Init(SHA1_CTX* context) { + context->state[0] = 0x67452301; + context->state[1] = 0xEFCDAB89; + context->state[2] = 0x98BADCFE; + context->state[3] = 0x10325476; + context->state[4] = 0xC3D2E1F0; + context->count[0] = context->count[1] = 0; +} + +void SHA1_Update(SHA1_CTX* context, const unsigned char* data, uint32_t len) { + uint32_t i, j; + j = context->count[0]; + if ((context->count[0] += len << 3) < j) context->count[1]++; + context->count[1] += (len >> 29); + j = (j >> 3) & 63; + if ((j + len) > 63) { + memcpy(&context->buffer[j], data, (i = 64 - j)); + SHA1_Transform(context->state, context->buffer); + for (; i + 63 < len; i += 64) { + SHA1_Transform(context->state, &data[i]); + } + j = 0; + } else { + i = 0; + } + memcpy(&context->buffer[j], &data[i], len - i); +} + +void SHA1_Final(unsigned char digest[20], SHA1_CTX* context) { + uint32_t i; + unsigned char finalcount[8]; + for (i = 0; i < 8; i++) { + finalcount[i] = (unsigned char)((context->count[(i >= 4 ? 0 : 1)] >> ((3 - (i & 3)) * 8)) & 255); + } + SHA1_Update(context, (unsigned char*)"\x80", 1); + while ((context->count[0] & 504) != 448) { + SHA1_Update(context, (unsigned char*)"\0", 1); + } + SHA1_Update(context, finalcount, 8); + for (i = 0; i < 20; i++) { + digest[i] = (unsigned char)((context->state[i >> 2] >> ((3 - (i & 3)) * 8)) & 255); + } +} + +// --- Base64 Implementation --- +char* base64_encode(const unsigned char *data, size_t input_length) { + const char b64_table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + size_t output_length = 4 * ((input_length + 2) / 3); + char *encoded_data = malloc(output_length + 1); + if (encoded_data == NULL) return NULL; + + for (size_t i = 0, j = 0; i < input_length;) { + uint32_t octet_a = i < input_length ? data[i++] : 0; + uint32_t octet_b = i < input_length ? data[i++] : 0; + uint32_t octet_c = i < input_length ? data[i++] : 0; + uint32_t triple = (octet_a << 16) + (octet_b << 8) + octet_c; + + encoded_data[j++] = b64_table[(triple >> 18) & 0x3F]; + encoded_data[j++] = b64_table[(triple >> 12) & 0x3F]; + encoded_data[j++] = b64_table[(triple >> 6) & 0x3F]; + encoded_data[j++] = b64_table[triple & 0x3F]; + } + + for (size_t i = 0; i < (3 - input_length % 3) % 3; i++) { + encoded_data[output_length - 1 - i] = '='; + } + encoded_data[output_length] = '\0'; + return encoded_data; +} + +// ============================================================================= +// END: Clean SHA-1 and Base64 Implementations +// ============================================================================= + + +// --- Utility Functions --- +double get_time_double() { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return ts.tv_sec + ts.tv_nsec / 1e9; +} + +void epoll_ctl_mod(int fd, uint32_t events, void* ptr) { + struct epoll_event ev; + ev.events = events; + ev.data.ptr = ptr; + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev); +} + +void close_client(Client* client) { + if (client->state != STATE_CLOSED) { + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client->fd, NULL); + close(client->fd); + client->state = STATE_CLOSED; + } +} + +// --- WebSocket Core Functions --- +size_t create_ws_frame(const char* payload, size_t payload_len, char* out_buffer) { + size_t frame_len = 2 + payload_len + 4; // Header + Mask + Payload + if (payload_len > 125) frame_len += 2; // For 16-bit length + + out_buffer[0] = 0x81; // FIN + Text Frame + if (payload_len <= 125) { + out_buffer[1] = 0x80 | payload_len; + } else { + out_buffer[1] = 0x80 | 126; + *(uint16_t*)(out_buffer + 2) = htons(payload_len); + } + + size_t header_len = (payload_len <= 125) ? 2 : 4; + uint32_t mask = rand(); + *(uint32_t*)(out_buffer + header_len) = mask; + + uint8_t* mask_bytes = (uint8_t*)&mask; + for(size_t i = 0; i < payload_len; ++i) { + out_buffer[header_len + 4 + i] = payload[i] ^ mask_bytes[i % 4]; + } + return header_len + 4 + payload_len; +} + +void send_handshake(Client* client) { + unsigned char key_bytes[16]; + for (int i = 0; i < 16; i++) key_bytes[i] = rand() % 256; + char* b64_key = base64_encode(key_bytes, 16); + + client->write_len = snprintf(client->write_buf, RW_BUFFER_SIZE, + "GET / HTTP/1.1\r\n" + "Host: %s:%d\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: %s\r\n" + "Sec-WebSocket-Version: 13\r\n\r\n", + HOST, PORT, b64_key); + + client->state = STATE_HANDSHAKE_SEND; + epoll_ctl_mod(client->fd, EPOLLIN | EPOLLOUT | EPOLLET, client); + free(b64_key); +} + +// --- Event Handlers --- +void handle_write(Client* client) { + ssize_t sent = send(client->fd, client->write_buf + client->write_pos, client->write_len - client->write_pos, 0); + if (sent < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) close_client(client); + return; + } + client->write_pos += sent; + + if (client->write_pos >= client->write_len) { + client->write_pos = 0; + client->write_len = 0; + epoll_ctl_mod(client->fd, EPOLLIN | EPOLLET, client); // Done writing, wait for reads + + if (client->state == STATE_HANDSHAKE_SEND) { + client->state = STATE_HANDSHAKE_RECV; + } else if (client->state == STATE_SUBSCRIBING) { + client->state = STATE_RUNNING; + subscriber_setup_count++; + if (subscriber_setup_count == NUM_SUBSCRIBERS) { + printf("✅ All subscribers are connected and subscribed. Starting publishers...\n"); + all_subscribed = 1; + } + } + } +} + +void handle_read(Client* client) { + ssize_t n = read(client->fd, client->read_buf + client->read_len, RW_BUFFER_SIZE - client->read_len); + if (n <= 0) { + if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) perror("read error"); + close_client(client); + return; + } + client->read_len += n; + + if (client->state == STATE_HANDSHAKE_RECV) { + if (strstr(client->read_buf, "\r\n\r\n")) { + if (strstr(client->read_buf, " 101 ") == NULL) { + fprintf(stderr, "Handshake failed for fd %d\n", client->fd); + close_client(client); + return; + } + client->read_len = 0; // Handshake complete, clear buffer + + if (client->type == CLIENT_SUBSCRIBER) { + const char* channel = CHANNELS[rand() % NUM_CHANNELS]; + char sub_msg[128]; + int msg_len = snprintf(sub_msg, sizeof(sub_msg), "sub %s", channel); + client->write_len = create_ws_frame(sub_msg, msg_len, client->write_buf); + client->state = STATE_SUBSCRIBING; + epoll_ctl_mod(client->fd, EPOLLIN | EPOLLOUT | EPOLLET, client); + } else { // Publisher + client->state = STATE_RUNNING; + } + } + } else if (client->state == STATE_RUNNING && client->type == CLIENT_SUBSCRIBER) { + // Simple WebSocket frame parsing for this specific test case + while (client->read_len >= 2) { + uint64_t payload_len = client->read_buf[1] & 0x7F; + size_t header_len = 2; + if (payload_len == 126) { + if (client->read_len < 4) break; + payload_len = ntohs(*(uint16_t*)(client->read_buf + 2)); + header_len = 4; + } else if (payload_len == 127) { + // Not expected for this test, would require 64-bit length handling + close_client(client); + break; + } + + if (client->read_len >= header_len + payload_len) { + char* payload = client->read_buf + header_len; + double sent_time = atof(payload); + if (sent_time > 0) { + double latency = get_time_double() - sent_time; + if (latencies_count < MAX_LATENCIES) { + latencies[latencies_count++] = latency; + } + messages_received++; + } + + size_t frame_size = header_len + payload_len; + memmove(client->read_buf, client->read_buf + frame_size, client->read_len - frame_size); + client->read_len -= frame_size; + } else { + break; // Incomplete frame + } + } + } +} + +// --- Statistics --- +int compare_doubles(const void* a, const void* b) { + double da = *(const double*)a; + double db = *(const double*)b; + if (da < db) return -1; + if (da > db) return 1; + return 0; +} + +void print_report() { + printf("\n" + "================================================================================\n"); + printf("%s\n", " PERFORMANCE REPORT "); + printf("================================================================================\n"); + + if (latencies_count == 0) { + printf("No messages were received. Cannot generate a report. Is the server running?\n"); + return; + } + + uint64_t message_loss = (messages_sent > messages_received) ? (messages_sent - messages_received) : 0; + double loss_rate = (messages_sent > 0) ? ((double)message_loss / messages_sent * 100.0) : 0; + double throughput = (double)messages_received / TEST_DURATION_S; + + printf("Test Duration: %d seconds\n", TEST_DURATION_S); + printf("Total Messages Sent: %lu\n", messages_sent); + printf("Total Messages Rcvd: %lu\n", messages_received); + printf("Message Loss: %lu (%.2f%%)\n", message_loss, loss_rate); + printf("Actual Throughput: %.2f msg/sec\n", throughput); + printf("--------------------------------------------------------------------------------\n"); + + qsort(latencies, latencies_count, sizeof(double), compare_doubles); + + double sum = 0; + for (size_t i = 0; i < latencies_count; ++i) sum += latencies[i]; + + printf("Latency Statistics (ms):\n"); + printf(" Average: %.4f ms\n", (sum / latencies_count) * 1000.0); + printf(" Min: %.4f ms\n", latencies[0] * 1000.0); + printf(" Max: %.4f ms\n", latencies[latencies_count - 1] * 1000.0); + printf(" Median (p50): %.4f ms\n", latencies[(size_t)(latencies_count * 0.50)] * 1000.0); + printf(" 95th Percentile: %.4f ms\n", latencies[(size_t)(latencies_count * 0.95)] * 1000.0); + printf(" 99th Percentile: %.4f ms\n", latencies[(size_t)(latencies_count * 0.99)] * 1000.0); + printf("================================================================================\n"); +} + +// --- Main Function --- +int main() { + srand(time(NULL)); + latencies = malloc(sizeof(double) * MAX_LATENCIES); + if (!latencies) { + perror("malloc latencies"); + return 1; + } + + printf("Starting WebSocket Pub/Sub Load Test...\n"); + printf("Simulating %d subscribers and %d publishers.\n", NUM_SUBSCRIBERS, NUM_PUBLISHERS); + printf("Publishing at ~%d msg/sec for %d seconds.\n", NUM_PUBLISHERS * MESSAGES_PER_SECOND_PER_PUBLISHER, TEST_DURATION_S); + printf("--------------------------------------------------------------------------------\n"); + + epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + perror("epoll_create1"); + free(latencies); + return 1; + } + + // *** FIX: Allocate clients on the heap, not the stack *** + Client* clients = malloc(sizeof(Client) * TOTAL_CLIENTS); + if (!clients) { + perror("malloc clients"); + free(latencies); + close(epoll_fd); + return 1; + } + + struct sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(PORT); + inet_pton(AF_INET, HOST, &server_addr.sin_addr); + + for (int i = 0; i < TOTAL_CLIENTS; ++i) { + clients[i].fd = socket(AF_INET, SOCK_STREAM, 0); + fcntl(clients[i].fd, F_SETFL, O_NONBLOCK); + + clients[i].type = (i < NUM_SUBSCRIBERS) ? CLIENT_SUBSCRIBER : CLIENT_PUBLISHER; + clients[i].state = STATE_CONNECTING; + clients[i].read_len = 0; + clients[i].write_len = 0; + clients[i].write_pos = 0; + clients[i].next_send_time = 0; + + connect(clients[i].fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); + + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = &clients[i]; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clients[i].fd, &ev) == -1) { + perror("epoll_ctl: add"); + free(latencies); + free(clients); + close(epoll_fd); + return 1; + } + } + + struct epoll_event events[MAX_EVENTS]; + double start_time = get_time_double(); + double end_time = start_time + TEST_DURATION_S; + + while (get_time_double() < end_time) { + int n = epoll_wait(epoll_fd, events, MAX_EVENTS, 10); // 10ms timeout + double now = get_time_double(); + + for (int i = 0; i < n; ++i) { + Client* client = (Client*)events[i].data.ptr; + if (events[i].events & (EPOLLERR | EPOLLHUP)) { + close_client(client); + continue; + } + + if (client->state == STATE_CONNECTING && (events[i].events & EPOLLOUT)) { + int result; + socklen_t result_len = sizeof(result); + getsockopt(client->fd, SOL_SOCKET, SO_ERROR, &result, &result_len); + if (result == 0) { + send_handshake(client); + } else { + close_client(client); + } + } else { + if (events[i].events & EPOLLIN) handle_read(client); + if (events[i].events & EPOLLOUT) handle_write(client); + } + } + + // Publisher logic + if (all_subscribed) { + for (int i = NUM_SUBSCRIBERS; i < TOTAL_CLIENTS; ++i) { + Client* client = &clients[i]; + if (client->state == STATE_RUNNING && client->write_len == 0 && now >= client->next_send_time) { + const char* channel = CHANNELS[rand() % NUM_CHANNELS]; + char message[256]; + int msg_len = snprintf(message, sizeof(message), "%.6f:Hello from publisher %d on channel %s", now, i - NUM_SUBSCRIBERS, channel); + + char pub_msg[384]; + int pub_msg_len = snprintf(pub_msg, sizeof(pub_msg), "pub %s %s", channel, message); + + client->write_len = create_ws_frame(pub_msg, pub_msg_len, client->write_buf); + messages_sent++; + + client->next_send_time = now + (1.0 / MESSAGES_PER_SECOND_PER_PUBLISHER); + epoll_ctl_mod(client->fd, EPOLLOUT | EPOLLIN | EPOLLET, client); + } + } + } + } + + printf("\nTest duration finished. Shutting down clients...\n"); + for (int i = 0; i < TOTAL_CLIENTS; ++i) { + close_client(&clients[i]); + } + + print_report(); + + free(clients); + free(latencies); + close(epoll_fd); + + return 0; +} diff --git a/rpubsub b/rpubsub new file mode 100755 index 0000000000000000000000000000000000000000..0c4913959f2554063fd9020ed5a27918d7bcc193 GIT binary patch literal 35336 zcmeHwe|%KcweLBT1QQIIprK90nn4GfsKgw8nJCnpFau|FqEtX*)ea$pc$4?7h}rYwfkyUirr}{S$0r~pzD$bTB)nTg3wo$)NFnjXeeUEbdYG=k z#Fs=-3w`0Su9?hk}8R@4Q(2mpof$PJ#$WY%KvZj zo6Y$$7ie-~Lyv|Q^ia?5d*CCx{8t}deZn(5zj*b~$IGpU+`PiW)m00oPJXz$d_r|q zP2#2DH>McMxE>18tX|OJj(lU5D(f{F(=I4qR zFZ#UaPh;-6^v{D0nV-1h;B({XQyK?P-S*Ao?~Q|Z#j)pa;>h0|2mfLm z{MtD9a2)*VIC_2(hyJcO`ZvXqvp9~N+tJZkz!UpC6Gwh!965K!p&x+Yo7Kw`@#KL1 z-dJS_cDXPzl(`x_rS%@yqSC4wV9J)3x+Vt8~Ml?nogc^2-<F zywL6OR4sCIc}q%R!qT!bcb#W)gqbPFj9cffudl6#7=$&oRBNT4s@fXp2*v8%Ws9kD z7u7Cy11K#Qn3)z+t>HffWu*;NMi#APK{blva#f*(6DPSE8p=vhb+BcPK~KGwe^?7& zf?Vbaw#GfTbV0QniZ5JLTf<%7rIFs05Y5DdnM)%6nb^08 zc@o9wGlF4m6Ovg-xC9}E#WmUlU_@agFd7GaYZO{KNlg@P2aO5e8kER57NYIz^S*CY zRgDsm{tCyv@ejYmbZMmUDu?U8Rh1$j-&Gu5$I_F9pK~~a@!ux=hQt5J(nkn?;P7S! zCkcPyaN}JhFH!g#hdoSwg3!g`jf~zTd<>W#eLbMh!!3_$_&h$}*YW*)9hTgrrR#Vd z?pvd$-wUYqA?DU}91<*h$;20SVv@+>vrM@F!M&}6{hZoscL;8P9w zbp|{fEAqT)z^6s9g0S9zHyiNp81TBUQt}1^ezbvpvjJ}$zXArlzSJeQP6M9S8~W32 zz>m?92>T59Zy50X2K+Y-_@DuwX}}K{@ZU1v2MzeU40wV271iNb13uY+*Vq1(oNmB> z+d!Xbz~5uQTMYOt13ue;A7{Yl8SwWS@c9P(eFl7?0dFaG0@+N-m<_g3*556EeqVT zz%2{hvcN41+_Jze3*556|1TD}Xuju~;ysvB~1VL#Dcue7L#oLkG&P))VGV%w4 z5FVe3Z*x|mK=hTA5;_(Rhbz`Gm^Oeyy-}DpenM?gm^OSue~H4h(G&Vz6sC=<(9feV zZTN&%Mq%0{2-Qbn+RzFuh{Ckt6M7~J(?(CIC<@aCPiSHkrj4D@y-}DpbV7GTVcN(E znW8Xl;Djy=MaoMXH=$!um^N%ey-}DpYC>&Mm^Nrae~H4hF%$Y-6s8TC(9feVZN!9D zMq%233Drko+IR^qh{Ckt5_%>I(?(0EC<@aCOK4&grj3=*y-}DpR6=(|VcJLunWAtO z!56-cl;1+|u_#O%DWTpdOdBYnwkS*+C!xPYVcIYW{VodAMoH-BQJ6MJLMx*%ZH$EK zqcClVgcd|$+6W0f6NPC5BvcfIY2za_F$&X$N9f)tOdB1cJEAacaD+@zm^L;-7j8t# zPa7JcV^NqkGD5vkm^M&DZ4o#;e(j5bP+`up39S8u$M5+5aCln`yeS5LHwONf82F!K z;9thTYh&P5G4N|K@ZuP_CI+sEfqxJKKN|x-6$8(VfxjCAPmO_dW8nK^;Co`=Z^Xc9 zG4O~OcH}{J{tpn5d(i31MiQ4_r$8zxni#ku2L3?|{A>*TR17>b2L5ggJarg6Pf<_E&&kiq^W?d66z}nVr~fUh zus$4CR-C2-AH|z$6`Tb}8qWdIkHJRqPnnB8_k9|N$`xPYc_)T2BU?xlmZRcdV_gG4ZC7{7n`t0aZC20DW`-n{f3`e$ z1{i`1LF}6gBFfOIex~kJlNw4$DTsUvIC>BzVvsbcZ&=?%H26AH+lGTX7oC5-W&R2K!mMiHA-`ai&}?0g^!FhgZm>f^ zO;rX@U4yn&T8gmiSC2W>a%-lN&}@B&QQ54s6@RmJ9S7$7zWpb>8q;#v-26?)_)VvJUGXOtI{b-0rF@s3Qnc|y@h9~_pfBH6 z@TIx=EcBPJ?8vlWa@NvAz~vpHx#eMqkiB0dnp-}E?uxfPRVjGUy42kKF8t8uD@(Nb zCfN$gt)9jW?@{(ruXNd_ei?ikWs}t%-9)LkM!#i(u2`h5TUB~LU**^M8Toj^MZ|l96RlB%S1uC>{P?RCIgWP ze&}MiP#8Q1MQ!Q@=C1ON78@yuVp2`ggXnq$qD5%3sT-|tBd(z2mA>gS=Ye|#xM3>w zJ+vk?h;AHD1ph|)`hJoZoIJ=P$x2JulN7uk5R?d>1&daXHuXv{9Z=|1aGSG);C&$R z=HG5^F4n{?BXKnl7kVBDFf}84s)*B0-<17xBc?AF&ldY$vp9Tzy2H&f-A0Q8n#rziyXdZk>|`zrJ&8+JR8Y2^=)P?c?J9!Zu8R?^bLw{ z%!5!>3cN?A9VjwySn~i`+1#>_rFzJiNubyQ-(`*({55jXytRd9bczo&tOW*Np_Hn( zST5A^C~3B`MYxdk07+L8-r{YrPste|y}Zj=E1t3x2THtQ`^G#u2SwRL1yf(N&JH~d z5?a?d$~v6B39lh>i(nxuki@D0IX5if5Vun+EOGc6^Oc+nj)ca%;DXDLK@IjGXdI$F zU-sDrz|AXQ4eubD;WEnJ2R0=k$qLC%-_m@iuQHEBaUQJ$IPOop-yvZecKVRN-{WWv*UKqaV`f{{;M9iBGvj)e;7N`HANyPrF!I^-cf9_t>yQ zO*q(f3Dx3H`WaBZeCBvg^^0ITVOs*r-wo|Xql>gRPg+p@inq)^B7yYDCw<-mfOf0o zV2Igh-GB-~5{=swRWQVLB4l-=sB5g70XQ)M2_OtEg+S&xe}scLMYPhHXiL%d)UpA1 zDIl*}MQtTC&8QEmafEqQEjn$7dRS2pb{zXjNitQW_w#vZtI2``=H`{C9R;IfPLI?7JO;z-Ci5yQgrXJ&N~>8;W>=9HF@njnfCuO8o^1m8Hq4ioae|3Wm(yziCO{Hj^xQ zQsHmc%-%l|E?t)9rY47egShVBaEJu+s&l|p$maY3mx)!#HmD+)|9UuNWu!Kg);pA9 zUbPdTgOMpKuG3g#{^??rWE(Bmx3E?JMc$sZcITh+G}dlte!qX3nxEG66sLMF=+Ua% z+`Nyv0{@hUKu7pg9e*e5hL_&Cji!i-`eSQJaP~!zVscf2%5$jO&CR5?Bj;;2s-vb9 zb?7}Bmz?TJ>g-!VzM0@B{nH-aVuct-!FFU-g{&Or8R$`3ojIZ4uRvC`qxqqIpoFn& zr#zoWB9K#3EyetQwss>09v(zM&-Sa!H!>VeY5Rc)F1|qH%Mc3S^iTgIAB!s*O*YwBSoOqiH-weoivBP(48= z4Zv;KRedu^w${q)sERDZS}0lwqdGJ8@P zO3`tw%0A}%h#>tFK&AF?I|({}xEG%Q=pJlC(gBT7iY9#k&Ih|7u$vj6UlY2lcYEk? z0LAZy>WKDizq%0om{A7E9x{yRP+O^GHs6*&4J?2G(?4n%>=s%M5!Bz9H*L*m1+~-v zjH&g@WW{^!4s$aNk45H98#71_T3v!T)Y_l!@CFja0aV@yoEQALx#C|5&8}Dre>bO zwL8r%4d_FquylD#1~fahHqxH_iFeZ)gL)yS`sEH*^KNW-(4d3n+HBo`S|%VHgS-?_ zRHwBtxZ@nUI1E{9pFm%;0+V&nG&C*{%*_WNha>6;(UJ`cY6RIqaX^a|GHJmJkU>Lo zw&I^V!bI&a&mrzp)VbDdM?r_#OOq{PEDV;Q@?l(N=!3v|mnz6VhI<;cVMZpj&3@#^)sQSE27D}dInS)YeHAP&Q zn?A&p1`>TfRW{VbBzxLuC+v`iNdp?CVxF-J9U}(8;1Da*sO3+B**B(%0`&vvrTC`2 z3c!ikAeb(iS4!{zr@A*d{VXPINyuHVkImF8@e=)vmNIRmNI|<|p4sjlOg4MJ15>|) znffe9z~*ZPhSXH+>NSk+9h14`3E)Y~e8oS<6e)>+X>w4)wjL6wJ+DV5)3s^WYUUiPY%f|%^tl(NUUz9zuL?%Nrd~e-73*+=R#M!F6h7{5o(_=}vMP^dy&JrMb#2;0 zmUVLeq|OJZ&={%6cqKvUj8?%V&az-|G5|J}WS!v<4Vxmea25p3Et{DF(0nHe!n(OI zAAv9sPI}jYrdEUwF*5#g2M_%T{kp@KiqdMPQCl58PwJqedPWYCzju;Zb9RZ{j+}ko z{y{SB%I)aF5tn+^{ob}@hi}?sMV%%UwAG&r{fT;a=8PZQMb6m4+{Q^WOn=F2OrV0F zpT=4U=5@*i(A4awIi{lwQ3qTKE`JoRuJ{#EM%fj0^2%Tlibw8-+1{9`D2GzEvJgDp z!7TIZ7co}^t@<;MW?EmZW16f^gIYfT7=En65DApMQ(B6ZVoM^W)HAOcQ1J zi5lRJBKw?)H&mzTMradUNDav5zd93#Du5XZy$ww831*k+3EzQ!L$KF>l23+Cu;G9D z7nte=gTS5&Vu|+=L5B!Bq_=N6|B%i&)Se$G>M`t14a}ZvmjY53sAUv3Jf>l$AbeGk ztJ4GjTVOE%{1pWSgPt)M=%1xHT7OEopSQnuectV#{tBa?@W&jloh`?c{~=O_J|rcXh#(D>jy#XH?ZQEaGrQsEdQ00f^8!3yQjbw|=D zz~W2b6^#6$B}nA@ki4Bqy>_y-w^`#&#?yys%5$V;k3A zLPU*{cI>W9?#h z$AZcW`$5f$nkkFdG&7KtUQ@?{{0j$d;sM#e@ewA97tO9hRxd9wnZyDKRK7#9c!5~2 zjFtUwe1llX3PGE=UskVCLfAx#TkYf8hy%s#)76BIr)6;$5$zdk6MJNpNxBTPUt`ki zE*wRtB+MLUxq#lF4eXO+8HI`&1sc%fIz8QU|&0CK0`4Ay&Ld zkx@=E!t96O7$~4tIuz5taT4dGNn6Qe#%L@muNV}P@#opqXlYSU+29ZRz#LmhG6Jc@ zN=j9DMV8L!CCZN}@+@Ad#K4L!t3CFV3-Y)wD(K~C`BV1rVv5~Nt7-6RRQ{u^8prj> z;*eI+HWABRRFPe}MCLfpi!FUZkuzB~wc5JXu6}64j3Qu9xn!4i6Zuwn#8f6qyhQn2 zL5-ZXVF`TNF5&+XVm_TEt2gXwi2UNDeOw3ZkU}g(ABOo)L#(Wxmqk_~sB>i1hxS6< zcc)D{LE&zB+*L%$QTmAwZ9)2&B4@HdeZkDZvwnIJW2lfJe|9}-^|R6j$Jvk-6wb-0_c9g!{}9jtJIlQxy8 zXZqj1mlTl z%l(W=7SB;@ISnuWnrZA5;x3w-LAq9zobGO&O)DbyvYC7Bx0mm1=D%f^HbGbCk{B?Mb_->(nY((P)86F z<#l-Q=dyIlu6{bp|JK7KgEBjX>fb}@7z)ITc6E~+AXJaqPzaCE|3jw0h->Z-6WGHeHdIj?J2B=I&9)j;%-HEGH6qe zlKQAMbm9kW(iLI|Lq#kq_omRCX{s*vIBF3{Vx>`L*SG_=6jo4;rl8xGVyIt)gwP6!@X`gP3c3B;iv9W`>jHdL9&<12^i?GF@ecUd2+yL4!3_-qe zz@+Y#F?mD|n2ez~m|Uuz1X3+@gVvU^6E$TfVVI(j-N4p1R!-@H>CmP1*)a)`h7dz% zGNiI;Yp-b>v(e=&-FWDOY$&^I63U*kpGt5BR>z#2872S&ca7T*>!8;d z!wN%oV**>HEGuE#q>nC?k@N~O*cBpZ`r&D=EaDAkdz8RsUH8< z>3YG)yRov+8jXxm3N(W#6Puf$rR>I>5j`K9?e{QG!D6KsJr`V&tPDbkyZj={w#|O&n%tF%7Ho<*Gm0AYt1x6WQrY$bT%+H$%JCC z(|MXAK@6Y4p`$>Zy^*0S10Toi(M|?rrn`(fL^V?tM63_flRV5^*qjrCkUll~42GhP zCXp$Ya-PYFOcJl5I=FJjBc|te$FR(_#=rxAVn*viWg-_DU0Mpf1sbv1=Ut0#{{T(S znf5T&*4WBGYh_~l&>0RB+dSFt2s|O=D6N}1$N!hGW z(-w1En5va&dsS-#)J9k{pxQ*Isr51hU^&W%171U0Sw){?jtAA+WK0@aRgodKVkjBC zzjXoTAZ&rx!xy=BOgABHAFma3PUpFH7=6iUh(01^HyXF7N!g|OK50o~at~?(^I)1b z1(~Y4*IhvOh5iJypQxB4P^Lf=!zM(7OqpI2=2K8d+-pa-f{qz3$3hg0Es)qiMAHZK zLd^F5HB(6n>42pWIABq}f@FgBU>!tLCDbEYK}c3VwoBWiY8&|Os~FAtsCEvc*ZqQ|2GC<{%_p5~YGEqvEMS7GV#f}{fOHO}>b6N?8Dk4(LIk1( zYug62Fd{a#@L{^py!5ELG#yhJlKc^dWDJIE5!z>y&Pr#oGN6QOSZwXFY3td;CQ^PYQR{hw9jR}eAhN+WuO+2O2dIbAEpP()bOBadFDlxxc z^VJh$Q8Cc|Fs74ljb(b2SY+`Oa>w4z5gN+i3a3zN86zCVNa{wU4ed5 zX@wuz)KgsXF7P!=EK@X$Bho1<1(r6d@{la{kp@ShK|9S)(7py_T3h#NiyF!(kcIhg zw#E)a^JNb#rxxbfL`=O-Y90npQV&4gZs`iVfTqBhlbw`4rQE_<>e9(Y{;l63G1_e6 zI&0~NO(IP}VB;&&RX8F|<+?DTBPPs$m3;r29LB>s!6CNB2hU!$hYTg{qStN)XyOcd ztchn&^NARii9 zPV7`?Wi%v&XJy*_KfK$Y_&~AvwRiCGC0TfNK9kOW&pFl0QOTiwAJZ0Uq41xD5SH_i ztRC@RPi}bSayyQGI)G|9>=})tpAqtEyKuQ3=RVtkEcVyljRO|UQBs}i+2Fd9?6tjA z_BYWv-37t=)Kkm(zAOadRg3%S+_?&5YM-~g&>L6|niDynIK)pww84vgU59Uk!{3-( zj9p~!7kKl)ER;MFVY_%z_$L`$?9_%-fOolDS%zgn-Ub1UkT&r;q2;JF9iPq`_#|_ucvwLK-d-z z;Hd1*)y^?O!9YEV(~E&Ui8T&Ht!RB-U6UYxr48qyWtoJ^Hv;} zeUBC6)v7C3968;Jc)(E*Yv#Dbmm+Gnt^G+k-Tn;GyeX}@tMRmXQ!h@M{z9(@ zlpTB8=Q$x1w_Ns2ajF+j;gI0ht4RR-6XiHdCu3c(QA?olgo5)S-gMd{=XKlGfm(ZC zYX#`QvgL860BUh(@Ts0~c;^gfGSOJ@75tEB{IAZIE-UJc!r)mlzxp36N44)s^~Do{ zF$)_`E2$WvVsax^pL8BEk6tz$_Kxt1)14q2_Ui9uAf$jV3opdrWcq!)b4Krw;&n~- zT^DXgsJ-DxFEflG6RT}XS-Vou{gX$X{&F1jP7mWOcF7$|Yj6a6`LW~JXIFY1DH$EX zO-%1&>=jii=c;a9KVQI_mkyuRc8~z8& z?xA*}cz2?eU=HTA!n0-}S7@&ImSo}_*P2d_{vwX#(7@Nzn*`~!Y)EA-sK8^*Z1mw= z`w0G)Lkh4rHc84u^iJVMlXkFO39s-?-vkQ9@3B6_4g^Wh;Hh0h;2sZR~)Sg#B1H8)d(+n7&+N!hcoc<2&T)Zm#HICHh%~7w@yM*3B6W)q| zczQE~MPy$K9U#j)?Re=Mx}0eVyjG*NgJNI#nZn|hvzc48(j?6P2Ih9K9ui+&XxVUHZW0{Q?!`Q$Ey>IR#H&oQdm0%4F$1_#J z+Y5rr-}Fs?0xC*f>Y;62Lk1mAjYS>TohZdu@#1#Vg3mIZEE;Fbmc`z%1eqG|ly zRqdBn^`A(M{GO|&w7lF>R$b+;@d%?vjv6VZAtt_AtFLESGtz7P~F@&Dt6_qO~;*k6Ol;gTJy7Kg?RQ&{AHzq$W2vmn7CN zcGp`PkU%)FEUK$^d)!FEqo>l(?z+n@wKbNy+IkN}S(a3JDp^=t|1$WspWSWHxCCM1 zlzh89Q=T%xULZ}LF!>RiJfUF940*zgDRNPsZASi-B1t0o%G|lLAC__@i=0Ijy zC22RizSdJ)R$JXLYUI4Sh4rQ7?nf<4+zaRzYu%nvBWFa4^{9oXj~Y4LG&1|OSri%~ zV7LF%M=qLSv(KERIG%XYS^U(~v%mMuoVoL!eXivB@0TtpD|c5ctgQOs%hii&YU^IX zuh2FwUb1xAkMg96lO|7@`bd7kgLox=_#@tDU#R};hv$ZlY+==O#NKVIxmgMvd zeAdgR+~U}T$2@>X;gh^A9DWC((i#rapCXhX3?f|J5e}zfNt}HE3m?FleOTLIeUgok z);EKAQ{5HZqgxv@?90Epk3TRF*6k06?X052Vn9+K<9rxN%);^z<1tidN8WixYJA;R_eqyo2x7>m-8ng~TBbK)R+`0R8<~~~ zjGUG%-!4$u={Ld!yp(#7XqoQNi|am?_mgebB*G4>NWOQ3seEKwwrsG;6Ste5i9yY}9s36Y{dT_vKJ#1h z4Lg->)~FG)Ur$(@=ucWR0tTLM8VUEX!4yIXKGQ(A9zHdW%K21UI?vxp`74w^>IV7m z8QGqeZQlV6cB5K8M7``uOSa!G$LJv6J~u77eaG$(_mD1cLbeBf|0eym2h(N7s1b_{ zy383(y1>%6o?8~UWr153xMhJ`7WjY10{ZXM>%T`&Lo7Y?oBQ;nY5@jpEkg4~dfvcq zKhX0nEr|T?J^pr$J;^-8UtzIF|NVQK>(Qg9Um6P661;)W(UJ(jlb51}BeX!!?{U&+ z+T%IBJ}%RIm!2_N5c$@pe%Y^SC|)d~Gf{(Sx<}7#S|EIrhxzoU$9Q!7QeJRf?*L1s zn+CK;&mVtR%O0H$f40QR2fH#9&Jq~h$qkDway^~#VVc9>XL&kKUl^>HgNy%f3-tTP zVg=?|G)y56XY+7A59@f?#KUzwT+hP|JPh!#n}_{89N?iaPGC1Px!K@iG9<^l8ThLhJX|za_ zawp~HO_(FH9T8KAxi5d%6EX{ z*_sKs8jkP&9wKdqtG5K2GWQrHp<|WZcp5@$~;kMxPQY)6eWsv~GdlhN@U*myeZGo=H^Yx^MsJCOm*U``ii3YS4t_cCmRKbL7sHV( zG|D#(PyBTpIh*3(>AujLmFrX-JncW-jQ+kj_-S$QPXV6}{WD_ppGWkj(UEzKUb+|J z$f=2gSL5K{ii7{RIQT#ud~Y25#W?uUsIQyZ?K^Ss`El@10#9}mV(eBDhrTfmesvuD zujAl1#=(CCyhX1hLwF4M(WY++`Z%X2or@z!!~}CRS!ljS2{QJi1AjBS-4_QxBM$yq z;M38+1y*yxJop~dCp~iiF5Bxw+6!^y{3s6o*KzRwljmz`8lEq`CUgfhTF*0N`&_{N zXB;{E;^0rk!QY63{{|-RH?!vhaqu(Z;Gd0yUla$wA`bo^fG0o6jIqyefVaRNvCm)Q z$k`hQ|0VGE#tH#!!UEzB6DISP;fcwFAC6+s*f{vyICx=BS-nTftrc9Z(gjs6Pw7IT zvb3hW+U>%1e7G}DsBf&P!HEZ6bd@F7-1^d*hKkzyMH)D-t{m6!8RNM)&}s10)z{YL z<_WlSZ=q{JV?~9#-o>vDtg0z@)z-VTd--r3Un!1%ss$c42us*mjdp#ar?lZ^*DHl#iy#|U+%>>5<+ZMb)wK&st6k-u+WH1pY2#8sKZYtt zwl}4qn;)xOrS~uXnbLJfTT-RK=$Z2;Czr&C&JS^0&1EjnB zhFvrmO~H+Sbhl#!LDvD&?S{h;e|OJiB$Y`XenVm;jYQ!FM7qUt818?6t>Ccx5hHxK zuu#7n(QxM@F1CzPOy+kmMlh7W&cyGu)UKwCP(b_Wm6H)H@xcwE7e+=>aDyWA=?H>I zU2ZgpQh`-Hl^54M4o{D{E;EuiS-*WUg2AMPrwL z4nr9)B#k7{wUl}ZnQZN9(r7xl_wef|UF8k6F0E7Jm#{{tab>2<@ibH}$7QUI4eoM& z#cG6rR2!BZzpOMufYN9;lSVMaf(Bn6;WJ#*8X*FkE_T=zt40ET=Vt`P1iKbAG;k9e zZ#9h&k$to(q&rOw=!xt&KY}n8OzVv!Nzjd7XBt5nsT**!?C=D3JL)hLRj_vF>2S1m zN$oHkd$AybDM0PvZc)A8qf19`dXuPbJh~h;!VJZ^_p)nWrAPuvHQvP=Nnj?`?$2eH zwGK z4(Ajw1z5C*-Us<_d1yaa#0Sef_GDt+LXRHm`@tGkqt)XS-iHsSJ`w(W&aa1gL}YmM zeH9%w6%iwUCFj>et4_#6eLbL~9Wng+{+1qEbV44+^5f^A1^g0NtzLe8pIZ<0{4srG zPptl>z|tPK&fmxN*TWLbzv-belAU#aeLuPiIAYQH^?h|c)c4Ur!XA=gd|m-Sd(=9= zzAvqZ^_*DOU#HVU+IFWsZynb6z4dSd=a1E&qXj=AWK*4A--p-304LP*k1hW{a{eMt zr|;|Qp?(fP{A9CO{@(&eHqqq^769~E*!QqIP6-oR|8D_j-(AcoPU6fgNNE5J$k-+J^m|jM56QS` +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // <--- FIX: Added for bool type +#include +#include +#include +#include // for strcasestr + +// --- Server Configuration --- +#define PORT 8080 +#define MAX_CLIENTS 65536 +#define MAX_EVENTS 2048 +#define READ_BUFFER_SIZE 8192 +#define WRITE_BUFFER_SIZE 262144 // 256KB per-client write buffer +#define MAX_FRAME_SIZE 65536 // 64KB max incoming frame +#define MAX_CHANNELS 1024 +#define MAX_SUBSCRIPTIONS 32 +#define WORKER_THREADS 4 // Number of threads for broadcasting +#define TASK_QUEUE_SIZE 16384 + +#define WEBSOCKET_KEY_MAGIC "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" +#define LISTEN_BACKLOG 32768 + +// Forward declarations +struct ChannelNode; + +// --- Data Structures --- + +// Circular write buffer for non-blocking sends +typedef struct { + uint8_t* data; + size_t capacity; + atomic_size_t head; + atomic_size_t tail; + pthread_spinlock_t lock; // Protects against concurrent writes from workers +} RingBuffer; + +typedef enum { + STATE_HANDSHAKE, + STATE_CONNECTED, + STATE_CLOSED +} ClientState; + +typedef struct { + ClientState state; + RingBuffer write_buf; + uint8_t* read_buf; + size_t read_len; + struct ChannelNode* subscriptions[MAX_SUBSCRIPTIONS]; + int sub_count; + atomic_char write_registered; // <--- FIX: Changed from atomic_bool to atomic_char +} Client; + +// Channel for pub/sub +typedef struct ChannelNode { + char name[64]; + int* subscribers; // Array of client FDs + int sub_count; + int sub_capacity; + pthread_rwlock_t lock; + struct ChannelNode* next; +} ChannelNode; + +typedef struct { + ChannelNode* buckets[256]; // Simple hash table for channels +} ChannelTable; + +// Task for worker threads to execute broadcasts +typedef struct { + struct ChannelNode* channel; + uint8_t* frame_data; + size_t frame_len; +} BroadcastTask; + +// Lock-free Single-Producer, Multi-Consumer queue for tasks +typedef struct { + BroadcastTask* tasks; + atomic_size_t head; + atomic_size_t tail; + size_t capacity; +} SPMCQueue; + +// --- Globals --- +Client* clients; +ChannelTable channels; +int epoll_fd; +int notify_pipe[2]; // Pipe for workers to signal main thread +SPMCQueue task_queue; +pthread_t worker_threads[WORKER_THREADS]; +volatile sig_atomic_t running = 1; +atomic_int active_connections = 0; + +// --- Function Prototypes --- +void remove_client(int fd, int gracefully); +void arm_write(int fd); + +// --- Utils --- +void handle_sigint(int sig) { running = 0; } + +static inline uint64_t get_ns_time() { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ts.tv_sec * 1000000000ULL + ts.tv_nsec; +} + +// --- Ring Buffer Implementation --- +void ring_buffer_init(RingBuffer* rb) { + rb->data = malloc(WRITE_BUFFER_SIZE); + rb->capacity = WRITE_BUFFER_SIZE; + atomic_init(&rb->head, 0); + atomic_init(&rb->tail, 0); + pthread_spin_init(&rb->lock, PTHREAD_PROCESS_PRIVATE); +} + +void ring_buffer_free(RingBuffer* rb) { + if (rb->data) free(rb->data); + pthread_spin_destroy(&rb->lock); +} + +// Tries to write data to the buffer. Used by worker threads. +int ring_buffer_write(RingBuffer* rb, const uint8_t* data, size_t len) { + pthread_spin_lock(&rb->lock); + size_t head = atomic_load_explicit(&rb->head, memory_order_relaxed); + size_t tail = atomic_load_explicit(&rb->tail, memory_order_relaxed); + size_t free_space = rb->capacity - (head - tail); + + if (len > free_space) { + pthread_spin_unlock(&rb->lock); + return 0; // Not enough space + } + + size_t head_idx = head % rb->capacity; + size_t to_end = rb->capacity - head_idx; + if (len <= to_end) { + memcpy(rb->data + head_idx, data, len); + } else { + memcpy(rb->data + head_idx, data, to_end); + memcpy(rb->data, data + to_end, len - to_end); + } + atomic_store_explicit(&rb->head, head + len, memory_order_release); + pthread_spin_unlock(&rb->lock); + return 1; +} + +// --- Task Queue --- +void queue_init(SPMCQueue* q) { + q->tasks = calloc(TASK_QUEUE_SIZE, sizeof(BroadcastTask)); + atomic_init(&q->head, 0); + atomic_init(&q->tail, 0); + q->capacity = TASK_QUEUE_SIZE; +} + +// Used by main I/O thread (single producer) +int queue_push(SPMCQueue* q, BroadcastTask task) { + size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); + size_t tail = atomic_load_explicit(&q->tail, memory_order_acquire); + if (head - tail >= q->capacity) { + return 0; // Queue full + } + q->tasks[head % q->capacity] = task; + atomic_store_explicit(&q->head, head + 1, memory_order_release); + return 1; +} + +// Used by worker threads (multi-consumer) +int queue_pop(SPMCQueue* q, BroadcastTask* task) { + while (1) { + size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); + size_t head = atomic_load_explicit(&q->head, memory_order_acquire); + if (tail >= head) { + return 0; // Queue empty + } + *task = q->tasks[tail % q->capacity]; + if (atomic_compare_exchange_weak_explicit(&q->tail, &tail, tail + 1, memory_order_release, memory_order_relaxed)) { + return 1; + } + } +} + +// --- SHA-1 Implementation --- +typedef struct { uint32_t s[5]; uint32_t c[2]; uint8_t b[64]; } SHA1_CTX; +#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits)))) +#define blk0(i) (block->l[i] = (rol(block->l[i],24)&0xFF00FF00) | (rol(block->l[i],8)&0x00FF00FF)) +#define blk(i) (block->l[i&15] = rol(block->l[(i+13)&15]^block->l[(i+8)&15]^block->l[(i+2)&15]^block->l[i&15],1)) +#define R0(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk0(i)+0x5A827999+rol(v,5);w=rol(w,30); +#define R1(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk(i)+0x5A827999+rol(v,5);w=rol(w,30); +#define R2(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0x6ED9EBA1+rol(v,5);w=rol(w,30); +#define R3(v,w,x,y,z,i) z+=(((w|x)&y)|(w&x))+blk(i)+0x8F1BBCDC+rol(v,5);w=rol(w,30); +#define R4(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0xCA62C1D6+rol(v,5);w=rol(w,30); + +void SHA1_Transform(uint32_t s[5], const uint8_t buffer[64]) { + uint32_t a, b, c, d, e; + typedef union { uint8_t c[64]; uint32_t l[16]; } CHAR64LONG16; + CHAR64LONG16* block = (CHAR64LONG16*)buffer; + a = s[0]; b = s[1]; c = s[2]; d = s[3]; e = s[4]; + R0(a,b,c,d,e, 0); R0(e,a,b,c,d, 1); R0(d,e,a,b,c, 2); R0(c,d,e,a,b, 3); + R0(b,c,d,e,a, 4); R0(a,b,c,d,e, 5); R0(e,a,b,c,d, 6); R0(d,e,a,b,c, 7); + R0(c,d,e,a,b, 8); R0(b,c,d,e,a, 9); R0(a,b,c,d,e,10); R0(e,a,b,c,d,11); + R0(d,e,a,b,c,12); R0(c,d,e,a,b,13); R0(b,c,d,e,a,14); R0(a,b,c,d,e,15); + R1(e,a,b,c,d,16); R1(d,e,a,b,c,17); R1(c,d,e,a,b,18); R1(b,c,d,e,a,19); + R2(a,b,c,d,e,20); R2(e,a,b,c,d,21); R2(d,e,a,b,c,22); R2(c,d,e,a,b,23); + R2(b,c,d,e,a,24); R2(a,b,c,d,e,25); R2(e,a,b,c,d,26); R2(d,e,a,b,c,27); + R2(c,d,e,a,b,28); R2(b,c,d,e,a,29); R2(a,b,c,d,e,30); R2(e,a,b,c,d,31); + R2(d,e,a,b,c,32); R2(c,d,e,a,b,33); R2(b,c,d,e,a,34); R2(a,b,c,d,e,35); + R2(e,a,b,c,d,36); R2(d,e,a,b,c,37); R2(c,d,e,a,b,38); R2(b,c,d,e,a,39); + R3(a,b,c,d,e,40); R3(e,a,b,c,d,41); R3(d,e,a,b,c,42); R3(c,d,e,a,b,43); + R3(b,c,d,e,a,44); R3(a,b,c,d,e,45); R3(e,a,b,c,d,46); R3(d,e,a,b,c,47); + R3(c,d,e,a,b,48); R3(b,c,d,e,a,49); R3(a,b,c,d,e,50); R3(e,a,b,c,d,51); + R3(d,e,a,b,c,52); R3(c,d,e,a,b,53); R3(b,c,d,e,a,54); R3(a,b,c,d,e,55); + R3(e,a,b,c,d,56); R3(d,e,a,b,c,57); R3(c,d,e,a,b,58); R3(b,c,d,e,a,59); + R4(a,b,c,d,e,60); R4(e,a,b,c,d,61); R4(d,e,a,b,c,62); R4(c,d,e,a,b,63); + R4(b,c,d,e,a,64); R4(a,b,c,d,e,65); R4(e,a,b,c,d,66); R4(d,e,a,b,c,67); + R4(c,d,e,a,b,68); R4(b,c,d,e,a,69); R4(a,b,c,d,e,70); R4(e,a,b,c,d,71); + R4(d,e,a,b,c,72); R4(c,d,e,a,b,73); R4(b,c,d,e,a,74); R4(a,b,c,d,e,75); + R4(e,a,b,c,d,76); R4(d,e,a,b,c,77); R4(c,d,e,a,b,78); R4(b,c,d,e,a,79); + s[0] += a; s[1] += b; s[2] += c; s[3] += d; s[4] += e; +} + +void SHA1_Init(SHA1_CTX* c) { + c->s[0] = 0x67452301; c->s[1] = 0xEFCDAB89; c->s[2] = 0x98BADCFE; + c->s[3] = 0x10325476; c->s[4] = 0xC3D2E1F0; + c->c[0] = c->c[1] = 0; +} + +void SHA1_Update(SHA1_CTX* c, const uint8_t* d, uint32_t l) { + uint32_t i, j; j = (c->c[0] >> 3) & 63; + if ((c->c[0] += l << 3) < (l << 3)) c->c[1]++; c->c[1] += (l >> 29); + if ((j + l) > 63) { + memcpy(&c->b[j], d, (i = 64-j)); + SHA1_Transform(c->s, c->b); + for (; i + 63 < l; i += 64) SHA1_Transform(c->s, &d[i]); + j = 0; + } else i = 0; + memcpy(&c->b[j], &d[i], l - i); +} + +void SHA1_Final(uint8_t d[20], SHA1_CTX* c) { + uint32_t i; uint8_t fc[8]; + for (i = 0; i < 8; i++) + fc[i] = (uint8_t)((c->c[(i >= 4 ? 0 : 1)] >> ((3-(i & 3)) * 8)) & 255); + SHA1_Update(c, (uint8_t*)"\200", 1); + while ((c->c[0] & 504) != 448) SHA1_Update(c, (uint8_t*)"\0", 1); + SHA1_Update(c, fc, 8); + for (i = 0; i < 20; i++) + d[i] = (uint8_t)((c->s[i>>2] >> ((3-(i & 3)) * 8)) & 255); +} + +// --- Base64 Implementation --- +const char b64_table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +char* base64_encode(const uint8_t* data, size_t len) { + size_t out_len = 4 * ((len + 2) / 3); + char* out = malloc(out_len + 1); + if (!out) return NULL; + for (size_t i = 0, j = 0; i < len;) { + uint32_t a = i < len ? data[i++] : 0; + uint32_t b = i < len ? data[i++] : 0; + uint32_t c = i < len ? data[i++] : 0; + uint32_t t = (a << 16) + (b << 8) + c; + out[j++] = b64_table[(t >> 18) & 0x3F]; + out[j++] = b64_table[(t >> 12) & 0x3F]; + out[j++] = b64_table[(t >> 6) & 0x3F]; + out[j++] = b64_table[t & 0x3F]; + } + for (size_t i = 0; i < (3 - len % 3) % 3; i++) + out[out_len - 1 - i] = '='; + out[out_len] = '\0'; + return out; +} + +// --- Channel Management --- +uint8_t hash_channel(const char* name) { + uint8_t hash = 53; // A prime starting number + while (*name) hash = (hash * 31) + *name++; // Another prime multiplier + return hash; +} + +ChannelNode* find_or_create_channel(const char* name) { + uint8_t h = hash_channel(name); + ChannelNode* node = channels.buckets[h]; + while (node) { + if (strcmp(node->name, name) == 0) return node; + node = node->next; + } + node = calloc(1, sizeof(ChannelNode)); + strncpy(node->name, name, 63); + node->sub_capacity = 8; + node->subscribers = malloc(sizeof(int) * node->sub_capacity); + pthread_rwlock_init(&node->lock, NULL); + node->next = channels.buckets[h]; + channels.buckets[h] = node; + return node; +} + +void add_subscriber(ChannelNode* ch, int fd) { + pthread_rwlock_wrlock(&ch->lock); + if (ch->sub_count >= ch->sub_capacity) { + ch->sub_capacity *= 2; + ch->subscribers = realloc(ch->subscribers, sizeof(int) * ch->sub_capacity); + } + ch->subscribers[ch->sub_count++] = fd; + pthread_rwlock_unlock(&ch->lock); + + Client* c = &clients[fd]; + if (c->sub_count < MAX_SUBSCRIPTIONS) { + c->subscriptions[c->sub_count++] = ch; + } +} + +void remove_subscriber(ChannelNode* ch, int fd) { + pthread_rwlock_wrlock(&ch->lock); + for (int i = 0; i < ch->sub_count; i++) { + if (ch->subscribers[i] == fd) { + ch->subscribers[i] = ch->subscribers[--ch->sub_count]; + break; + } + } + pthread_rwlock_unlock(&ch->lock); +} + +// --- WebSocket Logic --- +void handle_handshake(int fd) { + Client* c = &clients[fd]; + char* req = (char*)c->read_buf; + if (!strstr(req, "\r\n\r\n")) return; + + char* key_start = strcasestr(req, "Sec-WebSocket-Key: "); + if (!key_start) { remove_client(fd, 0); return; } + key_start += 19; + char* key_end = strchr(key_start, '\r'); + if (!key_end) { remove_client(fd, 0); return; } + + char key[256]; + size_t key_len = key_end - key_start; + memcpy(key, key_start, key_len); + memcpy(key + key_len, WEBSOCKET_KEY_MAGIC, strlen(WEBSOCKET_KEY_MAGIC)); + key[key_len + strlen(WEBSOCKET_KEY_MAGIC)] = '\0'; + + uint8_t sha1[20]; + SHA1_CTX ctx; + SHA1_Init(&ctx); + SHA1_Update(&ctx, (uint8_t*)key, strlen(key)); + SHA1_Final(sha1, &ctx); + + char* accept = base64_encode(sha1, 20); + char response[256]; + int len = snprintf(response, sizeof(response), + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: %s\r\n\r\n", accept); + free(accept); + + if (send(fd, response, len, MSG_NOSIGNAL | MSG_DONTWAIT) == len) { + c->state = STATE_CONNECTED; + c->read_len = 0; // Clear handshake data + atomic_fetch_add(&active_connections, 1); + } else { + remove_client(fd, 0); + } +} + +void process_ws_message(int fd, uint8_t* payload, size_t len) { + payload[len] = '\0'; // Ensure null termination for string functions + char cmd[16], channel_name[64]; + + if (sscanf((char*)payload, "%15s %63s", cmd, channel_name) < 2) return; + + if (strcmp(cmd, "sub") == 0) { + ChannelNode* ch = find_or_create_channel(channel_name); + if (ch) add_subscriber(ch, fd); + } else if (strcmp(cmd, "pub") == 0) { + char* msg_start = (char*)payload + strlen(cmd) + 1 + strlen(channel_name) + 1; + if (msg_start >= (char*)payload + len) return; + size_t msg_len = len - (msg_start - (char*)payload); + + ChannelNode* ch = find_or_create_channel(channel_name); + if (!ch || ch->sub_count == 0) return; + + // Build WebSocket frame header once + uint8_t header[10]; + int header_len = 2; + header[0] = 0x81; // FIN + Text Frame + if (msg_len < 126) { + header[1] = msg_len; + } else { + header[1] = 126; + header[2] = (msg_len >> 8) & 0xFF; + header[3] = msg_len & 0xFF; + header_len = 4; + } + + // Allocate a single buffer for the entire frame + size_t frame_len = header_len + msg_len; + uint8_t* frame_data = malloc(frame_len); + if (!frame_data) return; + memcpy(frame_data, header, header_len); + memcpy(frame_data + header_len, msg_start, msg_len); + + BroadcastTask task = { .channel = ch, .frame_data = frame_data, .frame_len = frame_len }; + if (!queue_push(&task_queue, task)) { + // If queue is full, drop the message and free memory + free(frame_data); + } + } +} + +void handle_ws_data(int fd) { + Client* c = &clients[fd]; + uint8_t* buf = c->read_buf; + size_t len = c->read_len; + + while (len >= 2) { + uint64_t payload_len = buf[1] & 0x7F; + size_t header_len = 2; + if (payload_len == 126) { + if (len < 4) break; + payload_len = ((uint64_t)buf[2] << 8) | buf[3]; + header_len = 4; + } else if (payload_len == 127) { + if (len < 10) break; + payload_len = __builtin_bswap64(*(uint64_t*)(buf + 2)); + header_len = 10; + } + + if (payload_len > MAX_FRAME_SIZE) { remove_client(fd, 0); return; } + + size_t mask_offset = header_len; + size_t payload_offset = header_len + 4; + size_t total_frame_len = payload_offset + payload_len; + + if (len < total_frame_len) break; // Incomplete frame + + uint32_t* mask = (uint32_t*)(buf + mask_offset); + uint8_t* payload = buf + payload_offset; + + // Unmask payload (optimized for 4-byte chunks) + for (size_t i = 0; i < payload_len / 4; i++) { + ((uint32_t*)payload)[i] ^= *mask; + } + for (size_t i = payload_len - (payload_len % 4); i < payload_len; i++) { + payload[i] ^= ((uint8_t*)mask)[i % 4]; + } + + uint8_t opcode = buf[0] & 0x0F; + if (opcode == 0x01) { // Text + process_ws_message(fd, payload, payload_len); + } else if (opcode == 0x08) { // Close + remove_client(fd, 1); + return; + } else if (opcode == 0x09) { // Ping + uint8_t frame[12]; + frame[0] = 0x8A; // Pong frame + memcpy(frame + 2, payload, payload_len < 10 ? payload_len : 10); + ring_buffer_write(&c->write_buf, frame, 2 + payload_len); + arm_write(fd); + } + + memmove(buf, buf + total_frame_len, len - total_frame_len); + len -= total_frame_len; + } + c->read_len = len; +} + +// --- Network Event Handlers --- +void handle_read(int fd) { + Client* c = &clients[fd]; + ssize_t n = recv(fd, c->read_buf + c->read_len, READ_BUFFER_SIZE - c->read_len, MSG_DONTWAIT); + + if (n > 0) { + c->read_len += n; + if (c->state == STATE_HANDSHAKE) { + handle_handshake(fd); + } else if (c->state == STATE_CONNECTED) { + handle_ws_data(fd); + } + } else if (n == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { + remove_client(fd, 0); + } +} + +void handle_write(int fd) { + Client* c = &clients[fd]; + RingBuffer* rb = &c->write_buf; + + size_t tail = atomic_load_explicit(&rb->tail, memory_order_acquire); + size_t head = atomic_load_explicit(&rb->head, memory_order_acquire); + if (tail == head) return; // Nothing to write + + size_t tail_idx = tail % rb->capacity; + size_t head_idx = head % rb->capacity; + size_t len = (head > tail) ? (head - tail) : (rb->capacity - tail_idx + head_idx); + + ssize_t sent; + if (head_idx > tail_idx || tail_idx == head_idx) { // Data does not wrap or buffer is full but appears as non-wrapping + sent = send(fd, rb->data + tail_idx, len, MSG_NOSIGNAL | MSG_DONTWAIT); + } else { // Wraps around + struct iovec iov[2]; + iov[0].iov_base = rb->data + tail_idx; + iov[0].iov_len = rb->capacity - tail_idx; + iov[1].iov_base = rb->data; + iov[1].iov_len = head_idx; + sent = writev(fd, iov, 2); + } + + if (sent > 0) { + atomic_store_explicit(&rb->tail, tail + sent, memory_order_release); + } else if (errno != EAGAIN && errno != EWOULDBLOCK) { + remove_client(fd, 0); + return; + } + + // If buffer is not empty, we need to keep writing + if (atomic_load_explicit(&rb->tail, memory_order_relaxed) != atomic_load_explicit(&rb->head, memory_order_relaxed)) { + arm_write(fd); + } else { + atomic_store(&c->write_registered, 0); + } +} + +void handle_accept(int server_fd) { + while (1) { + int fd = accept4(server_fd, NULL, NULL, SOCK_NONBLOCK); + if (fd < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + perror("accept4"); + continue; + } + if (fd >= MAX_CLIENTS) { close(fd); continue; } + + int opt = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + + Client* c = &clients[fd]; + memset(c, 0, sizeof(Client)); + c->state = STATE_HANDSHAKE; + c->read_buf = malloc(READ_BUFFER_SIZE); + ring_buffer_init(&c->write_buf); + atomic_init(&c->write_registered, 0); + + struct epoll_event ev = { .events = EPOLLIN | EPOLLET | EPOLLRDHUP, .data.fd = fd }; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + perror("epoll_ctl add client"); + free(c->read_buf); + ring_buffer_free(&c->write_buf); + close(fd); + } + } +} + +void remove_client(int fd, int gracefully) { + if (fd < 0 || fd >= MAX_CLIENTS || clients[fd].state == STATE_CLOSED) return; + + Client* c = &clients[fd]; + if (c->state == STATE_CONNECTED) { + atomic_fetch_sub(&active_connections, 1); + } + c->state = STATE_CLOSED; + + // Unsubscribe from channels efficiently + for (int i = 0; i < c->sub_count; i++) { + if (c->subscriptions[i]) { + remove_subscriber(c->subscriptions[i], fd); + } + } + + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); + close(fd); + free(c->read_buf); + ring_buffer_free(&c->write_buf); +} + +// --- Worker Thread Logic --- +void execute_broadcast(BroadcastTask* task) { + ChannelNode* ch = task->channel; + pthread_rwlock_rdlock(&ch->lock); + + // Create a temporary copy to avoid holding the lock for too long + int num_subs = ch->sub_count; + if (num_subs == 0) { + pthread_rwlock_unlock(&ch->lock); + return; + } + + int* subs_copy = malloc(sizeof(int) * num_subs); + if (subs_copy) { + memcpy(subs_copy, ch->subscribers, sizeof(int) * num_subs); + } + pthread_rwlock_unlock(&ch->lock); + + if (!subs_copy) return; + + for (int i = 0; i < num_subs; i++) { + int fd = subs_copy[i]; + if (fd < 0 || fd >= MAX_CLIENTS) continue; + + Client* c = &clients[fd]; + if (c->state != STATE_CONNECTED) continue; + + // Check if write buffer was empty before adding data + size_t head = atomic_load_explicit(&c->write_buf.head, memory_order_relaxed); + size_t tail = atomic_load_explicit(&c->write_buf.tail, memory_order_relaxed); + int was_empty = (head == tail); + + if (ring_buffer_write(&c->write_buf, task->frame_data, task->frame_len)) { + // If it was empty, we need to tell the I/O thread to arm EPOLLOUT + if (was_empty) { + arm_write(fd); + } + } + } + free(subs_copy); +} + +void* worker_main(void* arg) { + int id = *(int*)arg; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + if (id + 1 < sysconf(_SC_NPROCESSORS_ONLN)) { + CPU_SET(id + 1, &cpuset); // Pin workers to cores 1, 2, 3... + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + } + + while (running) { + BroadcastTask task; + if (queue_pop(&task_queue, &task)) { + execute_broadcast(&task); + free(task.frame_data); // Free the frame after broadcasting + } else { + usleep(100); // Sleep briefly if queue is empty + } + } + return NULL; +} + +// Safely tells the main I/O thread to arm EPOLLOUT for a given FD +void arm_write(int fd) { + if (fd < 0 || fd >= MAX_CLIENTS) return; + Client* c = &clients[fd]; + // Use CAS to avoid redundant pipe writes and epoll_ctl calls + char expected = 0; // <--- FIX: Changed from bool to char + if (atomic_compare_exchange_strong(&c->write_registered, &expected, 1)) { + write(notify_pipe[1], &fd, sizeof(fd)); + } +} + +// --- Main Server --- +int main() { + signal(SIGINT, handle_sigint); + signal(SIGPIPE, SIG_IGN); + + clients = calloc(MAX_CLIENTS, sizeof(Client)); + queue_init(&task_queue); + + // Create server socket + int server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + int opt = 1; + setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = htons(PORT), .sin_addr.s_addr = INADDR_ANY }; + if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { perror("bind"); return 1; } + if (listen(server_fd, LISTEN_BACKLOG) < 0) { perror("listen"); return 1; } + + epoll_fd = epoll_create1(0); + struct epoll_event ev = { .events = EPOLLIN | EPOLLET, .data.fd = server_fd }; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev); + + // Create pipe for thread communication + if (pipe2(notify_pipe, O_NONBLOCK) < 0) { perror("pipe2"); return 1; } + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = notify_pipe[0]; + epoll_ctl(epoll_fd, EPOLL_CTL_ADD, notify_pipe[0], &ev); + + // Pin main I/O thread to core 0 + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + sched_setaffinity(0, sizeof(cpuset), &cpuset); + + // Start worker threads + int worker_ids[WORKER_THREADS]; + for (int i = 0; i < WORKER_THREADS; i++) { + worker_ids[i] = i; + pthread_create(&worker_threads[i], NULL, worker_main, &worker_ids[i]); + } + + printf("Server started on port %d with %d worker threads.\n", PORT, WORKER_THREADS); + + struct epoll_event events[MAX_EVENTS]; + uint64_t last_stats_time = get_ns_time(); + + while (running) { + int n = epoll_wait(epoll_fd, events, MAX_EVENTS, 200); + for (int i = 0; i < n; i++) { + int fd = events[i].data.fd; + uint32_t e = events[i].events; + + if (fd == server_fd) { + handle_accept(server_fd); + } else if (fd == notify_pipe[0]) { + int client_fd; + while (read(notify_pipe[0], &client_fd, sizeof(client_fd)) > 0) { + struct epoll_event client_ev = { + .events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP, + .data.fd = client_fd + }; + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, client_fd, &client_ev); + } + } else { + if (e & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + remove_client(fd, 0); + continue; + } + if (e & EPOLLIN) handle_read(fd); + if (e & EPOLLOUT) handle_write(fd); + } + } + + uint64_t now = get_ns_time(); + if (now - last_stats_time > 5000000000ULL) { + printf("Active connections: %d\n", atomic_load(&active_connections)); + last_stats_time = now; + } + } + + printf("Shutting down...\n"); + for (int i = 0; i < WORKER_THREADS; i++) { + pthread_join(worker_threads[i], NULL); + } + + close(server_fd); + close(notify_pipe[0]); + close(notify_pipe[1]); + close(epoll_fd); + free(clients); + // ... further cleanup for channel structures etc. would be ideal in a real app ... + + printf("Server shutdown complete.\n"); + return 0; +} diff --git a/test.py b/test.py new file mode 100644 index 0000000..7408f2c --- /dev/null +++ b/test.py @@ -0,0 +1,148 @@ +import asyncio +import time +import random +import statistics +from collections import deque +import websockets + +# --- Test Configuration --- +HOST = "127.0.0.1" +PORT = 8080 +URI = f"ws://{HOST}:{PORT}" + +# Client setup +NUM_SUBSCRIBERS = 1000 +NUM_PUBLISHERS = 10 +CHANNELS = ["news", "sports", "tech", "finance", "weather"] + +# Test execution +TEST_DURATION_S = 15 +MESSAGES_PER_SECOND_PER_PUBLISHER = 100 # Increased message rate + +# --- Global State & Metrics --- +latencies = deque() +messages_sent = 0 +messages_received = 0 +subscriber_setup_count = 0 +all_subscribed_event = asyncio.Event() + +async def subscriber_client(client_id: int): + global subscriber_setup_count, messages_received + channel = random.choice(CHANNELS) + + try: + async with websockets.connect(URI) as websocket: + await websocket.send(f"sub {channel}") + subscriber_setup_count += 1 + if subscriber_setup_count == NUM_SUBSCRIBERS: + print("✅ All subscribers are connected and subscribed. Starting publishers...") + all_subscribed_event.set() + + while True: + message = await websocket.recv() + try: + sent_time_str = message.split(":", 1)[0] + sent_time = float(sent_time_str) + latency = time.time() - sent_time + latencies.append(latency) + messages_received += 1 + except (ValueError, IndexError): + print(f"Warning: Received malformed message: {message}") + + except (websockets.exceptions.ConnectionClosedError, ConnectionRefusedError) as e: + print(f"Subscriber {client_id} disconnected: {e}") + except asyncio.CancelledError: + pass + except Exception as e: + print(f"An unexpected error occurred in subscriber {client_id}: {e}") + +async def publisher_client(client_id: int): + global messages_sent + await all_subscribed_event.wait() + + sleep_interval = 1.0 / MESSAGES_PER_SECOND_PER_PUBLISHER + + try: + async with websockets.connect(URI) as websocket: + while True: + channel = random.choice(CHANNELS) + send_time = time.time() + message = f"{send_time:.6f}:Hello from publisher {client_id} on channel {channel}" + + await websocket.send(f"pub {channel} {message}") + messages_sent += 1 + + await asyncio.sleep(sleep_interval) + + except (websockets.exceptions.ConnectionClosedError, ConnectionRefusedError) as e: + print(f"Publisher {client_id} disconnected: {e}") + except asyncio.CancelledError: + pass + except Exception as e: + print(f"An unexpected error occurred in publisher {client_id}: {e}") + +def print_report(): + print("\n" + "="*80) + print("PERFORMANCE REPORT".center(80)) + print("="*80) + + if not latencies: + print("No messages were received. Cannot generate a report. Is the server running?") + return + + total_sent = messages_sent + total_received = messages_received + message_loss = max(0, total_sent - total_received) + loss_rate = (message_loss / total_sent * 100) if total_sent > 0 else 0 + throughput = total_received / TEST_DURATION_S + + print(f"Test Duration: {TEST_DURATION_S} seconds") + print(f"Total Messages Sent: {total_sent}") + print(f"Total Messages Rcvd: {total_received}") + print(f"Message Loss: {message_loss} ({loss_rate:.2f}%)") + print(f"Actual Throughput: {throughput:.2f} msg/sec") + print("-"*80) + + sorted_latencies = sorted(latencies) + avg_latency_ms = statistics.mean(sorted_latencies) * 1000 + min_latency_ms = sorted_latencies[0] * 1000 + max_latency_ms = sorted_latencies[-1] * 1000 + p50_latency_ms = statistics.median(sorted_latencies) * 1000 + p95_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.95)] * 1000 + p99_latency_ms = sorted_latencies[int(len(sorted_latencies) * 0.99)] * 1000 + + print("Latency Statistics (ms):") + print(f" Average: {avg_latency_ms:.4f} ms") + print(f" Min: {min_latency_ms:.4f} ms") + print(f" Max: {max_latency_ms:.4f} ms") + print(f" Median (p50): {p50_latency_ms:.4f} ms") + print(f" 95th Percentile: {p95_latency_ms:.4f} ms") + print(f" 99th Percentile: {p99_latency_ms:.4f} ms") + print("="*80) + +async def main(): + print("Starting WebSocket Pub/Sub Load Test...") + print(f"Simulating {NUM_SUBSCRIBERS} subscribers and {NUM_PUBLISHERS} publishers.") + print(f"Publishing at ~{NUM_PUBLISHERS * MESSAGES_PER_SECOND_PER_PUBLISHER} msg/sec for {TEST_DURATION_S} seconds.") + print("-"*80) + + subscriber_tasks = [asyncio.create_task(subscriber_client(i)) for i in range(NUM_SUBSCRIBERS)] + publisher_tasks = [asyncio.create_task(publisher_client(i)) for i in range(NUM_PUBLISHERS)] + all_tasks = subscriber_tasks + publisher_tasks + + try: + await asyncio.sleep(TEST_DURATION_S) + finally: + print("\nTest duration finished. Shutting down clients...") + for task in all_tasks: + task.cancel() + + await asyncio.gather(*all_tasks, return_exceptions=True) + print_report() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nTest interrupted by user.") +