Socket upgrade.

This commit is contained in:
retoor 2025-12-29 01:25:28 +01:00
parent 163828e213
commit 56d0ef01fa
2 changed files with 90 additions and 60 deletions

View File

@ -83,6 +83,7 @@ class SocketService(BaseService):
self.users = {} self.users = {}
self.subscriptions = {} self.subscriptions = {}
self.last_update = str(datetime.now()) self.last_update = str(datetime.now())
self._lock = asyncio.Lock()
async def user_availability_service(self): async def user_availability_service(self):
@ -139,14 +140,15 @@ class SocketService(BaseService):
username = safe_get(user, "username", "unknown") username = safe_get(user, "username", "unknown")
nick = safe_get(user, "nick") or username nick = safe_get(user, "nick") or username
color = s.user_color color = s.user_color
self.sockets.add(s) async with self._lock:
is_first_connection = False self.sockets.add(s)
if user_uid not in self.users: is_first_connection = False
self.users[user_uid] = set() if user_uid not in self.users:
is_first_connection = True self.users[user_uid] = set()
elif len(self.users[user_uid]) == 0: is_first_connection = True
is_first_connection = True elif len(self.users[user_uid]) == 0:
self.users[user_uid].add(s) is_first_connection = True
self.users[user_uid].add(s)
try: try:
fresh_user = await self.app.services.user.get(uid=user_uid) fresh_user = await self.app.services.user.get(uid=user_uid)
if fresh_user: if fresh_user:
@ -166,23 +168,42 @@ class SocketService(BaseService):
if not ws or not channel_uid or not user_uid: if not ws or not channel_uid or not user_uid:
return False return False
try: try:
existing_socket = None async with self._lock:
user_sockets = self.users.get(user_uid, set()) existing_socket = None
for sock in user_sockets: user_sockets = self.users.get(user_uid, set())
if sock and sock.ws == ws: for sock in user_sockets:
existing_socket = sock if sock and sock.ws == ws:
break existing_socket = sock
if not existing_socket: break
return False if not existing_socket:
existing_socket.subscribed_channels.add(channel_uid) return False
if channel_uid not in self.subscriptions: existing_socket.subscribed_channels.add(channel_uid)
self.subscriptions[channel_uid] = set() if channel_uid not in self.subscriptions:
self.subscriptions[channel_uid].add(user_uid) self.subscriptions[channel_uid] = set()
self.subscriptions[channel_uid].add(user_uid)
return True return True
except Exception as ex: except Exception as ex:
logger.warning(f"Failed to subscribe: {safe_str(ex)}") logger.warning(f"Failed to subscribe: {safe_str(ex)}")
return False return False
async def unsubscribe(self, ws, channel_uid, user_uid):
if not ws or not channel_uid or not user_uid:
return False
try:
async with self._lock:
if channel_uid in self.subscriptions:
self.subscriptions[channel_uid].discard(user_uid)
if len(self.subscriptions[channel_uid]) == 0:
del self.subscriptions[channel_uid]
for s in self.sockets:
if s and s.ws == ws:
s.subscribed_channels.discard(channel_uid)
break
return True
except Exception as ex:
logger.warning(f"Failed to unsubscribe: {safe_str(ex)}")
return False
async def send_to_user(self, user_uid, message): async def send_to_user(self, user_uid, message):
if not user_uid or message is None: if not user_uid or message is None:
return 0 return 0
@ -250,40 +271,41 @@ class SocketService(BaseService):
async def delete(self, ws): async def delete(self, ws):
if not ws: if not ws:
return return
sockets_to_remove = [sock for sock in self.sockets if sock and sock.ws == ws] async with self._lock:
for s in sockets_to_remove: sockets_to_remove = [sock for sock in self.sockets if sock and sock.ws == ws]
self.sockets.discard(s) for s in sockets_to_remove:
departures_to_broadcast = [] self.sockets.discard(s)
channels_to_cleanup = set() departures_to_broadcast = []
for s in sockets_to_remove: channels_to_cleanup = set()
user_uid = s.user_uid for s in sockets_to_remove:
if not user_uid: user_uid = s.user_uid
continue if not user_uid:
is_last_connection = False continue
if user_uid in self.users: is_last_connection = False
self.users[user_uid].discard(s) if user_uid in self.users:
if len(self.users[user_uid]) == 0: self.users[user_uid].discard(s)
del self.users[user_uid] if len(self.users[user_uid]) == 0:
is_last_connection = True del self.users[user_uid]
if is_last_connection: is_last_connection = True
user_nick = None if is_last_connection:
user_nick = None
try:
if s.user:
user_nick = safe_get(s.user, "nick") or safe_get(s.user, "username")
except Exception:
pass
if user_nick:
departures_to_broadcast.append((user_uid, user_nick, s.user_color))
for channel_uid in list(s.subscribed_channels):
channels_to_cleanup.add((channel_uid, user_uid))
for channel_uid, user_uid in channels_to_cleanup:
try: try:
if s.user: if channel_uid in self.subscriptions:
user_nick = safe_get(s.user, "nick") or safe_get(s.user, "username") self.subscriptions[channel_uid].discard(user_uid)
except Exception: if len(self.subscriptions[channel_uid]) == 0:
pass del self.subscriptions[channel_uid]
if user_nick: except Exception as ex:
departures_to_broadcast.append((user_uid, user_nick, s.user_color)) logger.debug(f"Failed to cleanup channel subscription: {safe_str(ex)}")
for channel_uid in list(s.subscribed_channels):
channels_to_cleanup.add((channel_uid, user_uid))
for channel_uid, user_uid in channels_to_cleanup:
try:
if channel_uid in self.subscriptions:
self.subscriptions[channel_uid].discard(user_uid)
if len(self.subscriptions[channel_uid]) == 0:
del self.subscriptions[channel_uid]
except Exception as ex:
logger.debug(f"Failed to cleanup channel subscription: {safe_str(ex)}")
for s in sockets_to_remove: for s in sockets_to_remove:
try: try:
username = safe_get(s.user, "username", "unknown") if s.user else "unknown" username = safe_get(s.user, "username", "unknown") if s.user else "unknown"

View File

@ -157,12 +157,10 @@ class MessageList extends HTMLElement {
threshold: 0, threshold: 0,
}); });
// End-of-messages marker
this.endOfMessages = document.createElement('div'); this.endOfMessages = document.createElement('div');
this.endOfMessages.classList.add('message-list-bottom'); this.endOfMessages.classList.add('message-list-bottom');
this.prepend(this.endOfMessages); this.prepend(this.endOfMessages);
// Observe existing children and index by uid
for (const c of this.children) { for (const c of this.children) {
this._observer.observe(c); this._observer.observe(c);
if (c instanceof MessageElement) { if (c instanceof MessageElement) {
@ -170,20 +168,30 @@ class MessageList extends HTMLElement {
} }
} }
// Wire up socket events this._updateMessageHandler = (data) => {
app.ws.addEventListener("update_message_text", (data) => {
if (this.messageMap.has(data.uid)) { if (this.messageMap.has(data.uid)) {
this.upsertMessage(data); this.upsertMessage(data);
} }
}); };
app.ws.addEventListener("set_typing", (data) => { this._typingHandler = (data) => {
if (app._debug) console.debug("set_typing event received:", data); if (app._debug) console.debug("set_typing event received:", data);
this.triggerGlow(data.user_uid, data.color); this.triggerGlow(data.user_uid, data.color);
}); };
app.ws.addEventListener("update_message_text", this._updateMessageHandler);
app.ws.addEventListener("set_typing", this._typingHandler);
this.scrollToBottom(true); this.scrollToBottom(true);
} }
disconnectedCallback() {
if (this._observer) {
this._observer.disconnect();
}
app.ws.removeEventListener("update_message_text", this._updateMessageHandler);
app.ws.removeEventListener("set_typing", this._typingHandler);
}
connectedCallback() { connectedCallback() {
this.addEventListener('click', (e) => { this.addEventListener('click', (e) => {
if ( if (