From 56d0ef01fa26140c1837ceff1709e1f0c7a1ad9b Mon Sep 17 00:00:00 2001 From: retoor Date: Mon, 29 Dec 2025 01:25:28 +0100 Subject: [PATCH] Socket upgrade. --- src/snek/service/socket.py | 128 +++++++++++++++++++------------- src/snek/static/message-list.js | 22 ++++-- 2 files changed, 90 insertions(+), 60 deletions(-) diff --git a/src/snek/service/socket.py b/src/snek/service/socket.py index 47c4212..13c65fc 100644 --- a/src/snek/service/socket.py +++ b/src/snek/service/socket.py @@ -83,6 +83,7 @@ class SocketService(BaseService): self.users = {} self.subscriptions = {} self.last_update = str(datetime.now()) + self._lock = asyncio.Lock() async def user_availability_service(self): @@ -139,14 +140,15 @@ class SocketService(BaseService): username = safe_get(user, "username", "unknown") nick = safe_get(user, "nick") or username color = s.user_color - self.sockets.add(s) - is_first_connection = False - if user_uid not in self.users: - self.users[user_uid] = set() - is_first_connection = True - elif len(self.users[user_uid]) == 0: - is_first_connection = True - self.users[user_uid].add(s) + async with self._lock: + self.sockets.add(s) + is_first_connection = False + if user_uid not in self.users: + self.users[user_uid] = set() + is_first_connection = True + elif len(self.users[user_uid]) == 0: + is_first_connection = True + self.users[user_uid].add(s) try: fresh_user = await self.app.services.user.get(uid=user_uid) if fresh_user: @@ -166,23 +168,42 @@ class SocketService(BaseService): if not ws or not channel_uid or not user_uid: return False try: - existing_socket = None - user_sockets = self.users.get(user_uid, set()) - for sock in user_sockets: - if sock and sock.ws == ws: - existing_socket = sock - break - if not existing_socket: - return False - existing_socket.subscribed_channels.add(channel_uid) - if channel_uid not in self.subscriptions: - self.subscriptions[channel_uid] = set() - self.subscriptions[channel_uid].add(user_uid) + async with self._lock: + existing_socket = None + user_sockets = self.users.get(user_uid, set()) + for sock in user_sockets: + if sock and sock.ws == ws: + existing_socket = sock + break + if not existing_socket: + return False + existing_socket.subscribed_channels.add(channel_uid) + if channel_uid not in self.subscriptions: + self.subscriptions[channel_uid] = set() + self.subscriptions[channel_uid].add(user_uid) return True except Exception as ex: logger.warning(f"Failed to subscribe: {safe_str(ex)}") return False + async def unsubscribe(self, ws, channel_uid, user_uid): + if not ws or not channel_uid or not user_uid: + return False + try: + async with self._lock: + if channel_uid in self.subscriptions: + self.subscriptions[channel_uid].discard(user_uid) + if len(self.subscriptions[channel_uid]) == 0: + del self.subscriptions[channel_uid] + for s in self.sockets: + if s and s.ws == ws: + s.subscribed_channels.discard(channel_uid) + break + return True + except Exception as ex: + logger.warning(f"Failed to unsubscribe: {safe_str(ex)}") + return False + async def send_to_user(self, user_uid, message): if not user_uid or message is None: return 0 @@ -250,40 +271,41 @@ class SocketService(BaseService): async def delete(self, ws): if not ws: return - sockets_to_remove = [sock for sock in self.sockets if sock and sock.ws == ws] - for s in sockets_to_remove: - self.sockets.discard(s) - departures_to_broadcast = [] - channels_to_cleanup = set() - for s in sockets_to_remove: - user_uid = s.user_uid - if not user_uid: - continue - is_last_connection = False - if user_uid in self.users: - self.users[user_uid].discard(s) - if len(self.users[user_uid]) == 0: - del self.users[user_uid] - is_last_connection = True - if is_last_connection: - user_nick = None + async with self._lock: + sockets_to_remove = [sock for sock in self.sockets if sock and sock.ws == ws] + for s in sockets_to_remove: + self.sockets.discard(s) + departures_to_broadcast = [] + channels_to_cleanup = set() + for s in sockets_to_remove: + user_uid = s.user_uid + if not user_uid: + continue + is_last_connection = False + if user_uid in self.users: + self.users[user_uid].discard(s) + if len(self.users[user_uid]) == 0: + del self.users[user_uid] + is_last_connection = True + if is_last_connection: + user_nick = None + try: + if s.user: + user_nick = safe_get(s.user, "nick") or safe_get(s.user, "username") + except Exception: + pass + if user_nick: + departures_to_broadcast.append((user_uid, user_nick, s.user_color)) + for channel_uid in list(s.subscribed_channels): + channels_to_cleanup.add((channel_uid, user_uid)) + for channel_uid, user_uid in channels_to_cleanup: try: - if s.user: - user_nick = safe_get(s.user, "nick") or safe_get(s.user, "username") - except Exception: - pass - if user_nick: - departures_to_broadcast.append((user_uid, user_nick, s.user_color)) - for channel_uid in list(s.subscribed_channels): - channels_to_cleanup.add((channel_uid, user_uid)) - for channel_uid, user_uid in channels_to_cleanup: - try: - if channel_uid in self.subscriptions: - self.subscriptions[channel_uid].discard(user_uid) - if len(self.subscriptions[channel_uid]) == 0: - del self.subscriptions[channel_uid] - except Exception as ex: - logger.debug(f"Failed to cleanup channel subscription: {safe_str(ex)}") + if channel_uid in self.subscriptions: + self.subscriptions[channel_uid].discard(user_uid) + if len(self.subscriptions[channel_uid]) == 0: + del self.subscriptions[channel_uid] + except Exception as ex: + logger.debug(f"Failed to cleanup channel subscription: {safe_str(ex)}") for s in sockets_to_remove: try: username = safe_get(s.user, "username", "unknown") if s.user else "unknown" diff --git a/src/snek/static/message-list.js b/src/snek/static/message-list.js index 00a58ff..c2b0fe4 100644 --- a/src/snek/static/message-list.js +++ b/src/snek/static/message-list.js @@ -157,12 +157,10 @@ class MessageList extends HTMLElement { threshold: 0, }); - // End-of-messages marker this.endOfMessages = document.createElement('div'); this.endOfMessages.classList.add('message-list-bottom'); this.prepend(this.endOfMessages); - // Observe existing children and index by uid for (const c of this.children) { this._observer.observe(c); if (c instanceof MessageElement) { @@ -170,20 +168,30 @@ class MessageList extends HTMLElement { } } - // Wire up socket events - app.ws.addEventListener("update_message_text", (data) => { + this._updateMessageHandler = (data) => { if (this.messageMap.has(data.uid)) { this.upsertMessage(data); } - }); - app.ws.addEventListener("set_typing", (data) => { + }; + this._typingHandler = (data) => { if (app._debug) console.debug("set_typing event received:", data); this.triggerGlow(data.user_uid, data.color); - }); + }; + + app.ws.addEventListener("update_message_text", this._updateMessageHandler); + app.ws.addEventListener("set_typing", this._typingHandler); this.scrollToBottom(true); } + disconnectedCallback() { + if (this._observer) { + this._observer.disconnect(); + } + app.ws.removeEventListener("update_message_text", this._updateMessageHandler); + app.ws.removeEventListener("set_typing", this._typingHandler); + } + connectedCallback() { this.addEventListener('click', (e) => { if (