refactor: remove presence debounce for instant user departures

refactor: simplify websocket connection and error handling in rpc view
This commit is contained in:
retoor 2025-12-17 22:47:21 +01:00
parent 2e886c7bc1
commit bdc8f149dc
4 changed files with 43 additions and 65 deletions

View File

@ -6,6 +6,14 @@
## Version 1.6.0 - 2025-12-17
Removes presence debounce to make user departures instant. Simplifies websocket connection and error handling in RPC views for improved reliability.
**Changes:** 2 files, 98 lines
**Languages:** Python (98 lines)
## Version 1.5.0 - 2025-12-17 ## Version 1.5.0 - 2025-12-17
remove umami analytics script remove umami analytics script

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "Snek" name = "Snek"
version = "1.5.0" version = "1.6.0"
readme = "README.md" readme = "README.md"
#license = { file = "LICENSE", content-type="text/markdown" } #license = { file = "LICENSE", content-type="text/markdown" }
description = "Snek Chat Application by Molodetz" description = "Snek Chat Application by Molodetz"

View File

@ -8,9 +8,6 @@ from snek.system.service import BaseService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from snek.system.model import now from snek.system.model import now
PRESENCE_DEBOUNCE_SECONDS = 3.0
class SocketService(BaseService): class SocketService(BaseService):
class Socket: class Socket:
@ -48,7 +45,6 @@ class SocketService(BaseService):
self.users = {} self.users = {}
self.subscriptions = {} self.subscriptions = {}
self.last_update = str(datetime.now()) self.last_update = str(datetime.now())
self._departure_tasks = {}
async def user_availability_service(self): async def user_availability_service(self):
logger.info("User availability update service started.") logger.info("User availability update service started.")
@ -90,10 +86,6 @@ class SocketService(BaseService):
logger.info(f"Added socket for user {s.user['username']}") logger.info(f"Added socket for user {s.user['username']}")
is_first_connection = False is_first_connection = False
if user_uid in self._departure_tasks:
self._departure_tasks[user_uid].cancel()
del self._departure_tasks[user_uid]
if not self.users.get(user_uid): if not self.users.get(user_uid):
self.users[user_uid] = set() self.users[user_uid] = set()
is_first_connection = True is_first_connection = True
@ -147,25 +139,7 @@ class SocketService(BaseService):
if user_uid in self.users: if user_uid in self.users:
self.users[user_uid].discard(s) self.users[user_uid].discard(s)
if len(self.users[user_uid]) == 0: if len(self.users[user_uid]) == 0:
await self._schedule_departure(user_uid, user_nick, user_color) await self._broadcast_presence("departed", user_uid, user_nick, user_color)
async def _schedule_departure(self, user_uid, user_nick, user_color):
if user_uid in self._departure_tasks:
return
async def delayed_departure():
try:
await asyncio.sleep(PRESENCE_DEBOUNCE_SECONDS)
if user_uid in self.users and len(self.users[user_uid]) == 0:
await self._broadcast_presence("departed", user_uid, user_nick, user_color)
if user_uid in self._departure_tasks:
del self._departure_tasks[user_uid]
except asyncio.CancelledError:
pass
except Exception as ex:
logger.warning(f"Error in departure broadcast: {ex}")
self._departure_tasks[user_uid] = asyncio.create_task(delayed_departure())
async def _broadcast_presence(self, event_type, user_uid, user_nick, user_color): async def _broadcast_presence(self, event_type, user_uid, user_nick, user_color):
if not user_uid or not user_nick: if not user_uid or not user_nick:

View File

@ -627,44 +627,40 @@ class RPCView(BaseView):
ws = web.WebSocketResponse() ws = web.WebSocketResponse()
await ws.prepare(self.request) await ws.prepare(self.request)
user_uid = self.request.session.get("uid") if self.request.session.get("logged_in") else None if self.request.session.get("logged_in"):
await self.services.socket.add(ws, self.request.session.get("uid"))
try: async for subscription in self.services.channel_member.find(
if user_uid: user_uid=self.request.session.get("uid"),
await self.services.socket.add(ws, user_uid) deleted_at=None,
async for subscription in self.services.channel_member.find( is_banned=False,
user_uid=user_uid, ):
deleted_at=None, await self.services.socket.subscribe(
is_banned=False, ws, subscription["channel_uid"], self.request.session.get("uid")
):
await self.services.socket.subscribe(
ws, subscription["channel_uid"], user_uid
)
if not scheduled and self.request.app.uptime_seconds < 5:
await schedule(
user_uid,
0,
{"event": "refresh", "data": {"message": "Finishing deployment"}},
)
await schedule(
user_uid,
15,
{"event": "deployed", "data": {"uptime": self.request.app.uptime}},
) )
if not scheduled and self.request.app.uptime_seconds < 5:
await schedule(
self.request.session.get("uid"),
0,
{"event": "refresh", "data": {"message": "Finishing deployment"}},
)
await schedule(
self.request.session.get("uid"),
15,
{"event": "deployed", "data": {"uptime": self.request.app.uptime}},
)
rpc = RPCView.RPCApi(self, ws) rpc = RPCView.RPCApi(self, ws)
async for msg in ws: async for msg in ws:
if msg.type == web.WSMsgType.TEXT: if msg.type == web.WSMsgType.TEXT:
try: try:
await rpc(msg.json()) await rpc(msg.json())
except Exception as ex: except Exception as ex:
logger.exception(ex) print("XXXXXXXXXX Deleting socket", ex, flush=True)
break logger.exception(ex)
elif msg.type == web.WSMsgType.ERROR: await self.services.socket.delete(ws)
break break
elif msg.type == web.WSMsgType.CLOSE: elif msg.type == web.WSMsgType.ERROR:
break pass
finally: elif msg.type == web.WSMsgType.CLOSE:
await self.services.socket.delete(ws) pass
return ws return ws