From 3594ac1f5984953487e0c3423c9672b01e416c28 Mon Sep 17 00:00:00 2001 From: retoor <retoor@molodetz.nl> Date: Thu, 10 Apr 2025 13:34:32 +0200 Subject: [PATCH] Performance upgrade. --- src/snek/service/channel_member.py | 4 ++++ src/snek/service/socket.py | 14 +++++++------- src/snek/view/rpc.py | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/snek/service/channel_member.py b/src/snek/service/channel_member.py index 16d1887..a2300b6 100644 --- a/src/snek/service/channel_member.py +++ b/src/snek/service/channel_member.py @@ -10,6 +10,10 @@ class ChannelMemberService(BaseService): channel_member["new_count"] = 0 return await self.save(channel_member) + async def get_user_uids(self, channel_uid): + async for model in self.mapper.query("SELECT user_uid FROM channel_member WHERE channel_uid=:channel_uid", {"channel_uid": channel_uid}): + yield model["user_uid"] + async def create( self, channel_uid, diff --git a/src/snek/service/socket.py b/src/snek/service/socket.py index 072a86f..c084eb9 100644 --- a/src/snek/service/socket.py +++ b/src/snek/service/socket.py @@ -16,9 +16,8 @@ class SocketService(BaseService): try: await self.ws.send_json(data) except Exception as ex: - print(ex, flush=True) self.is_connected = False - return True + return self.is_connected async def close(self): if not self.is_connected: @@ -43,7 +42,6 @@ class SocketService(BaseService): self.users[user_uid].add(s) async def subscribe(self, ws, channel_uid, user_uid): - return if channel_uid not in self.subscriptions: self.subscriptions[channel_uid] = set() s = self.Socket(ws, await self.app.services.user.get(uid=user_uid)) @@ -57,10 +55,12 @@ class SocketService(BaseService): return count async def broadcast(self, channel_uid, message): - async for channel_member in self.app.services.channel_member.find( - channel_uid=channel_uid - ): - await self.send_to_user(channel_member["user_uid"], message) + try: + async for user_uid in self.services.channel_member.get_user_uids(channel_uid): + print(user_uid, flush=True) + await self.send_to_user(user_uid, message) + except Exception as ex: + print(ex, flush=True) return True async def delete(self, ws): diff --git a/src/snek/view/rpc.py b/src/snek/view/rpc.py index 19c98d4..3161f49 100644 --- a/src/snek/view/rpc.py +++ b/src/snek/view/rpc.py @@ -273,7 +273,7 @@ class RPCView(BaseView): async with Profiler(): await rpc(msg.json()) except Exception as ex: - print(ex, flush=True) + print("Deleting socket", ex, flush=True) await self.services.socket.delete(ws) break elif msg.type == web.WSMsgType.ERROR: