diff --git a/src/snek/app.py b/src/snek/app.py index fa211d1..24af03b 100644 --- a/src/snek/app.py +++ b/src/snek/app.py @@ -105,12 +105,15 @@ class Application(BaseApplication): self.services = get_services(app=self) self.mappers = get_mappers(app=self) self.broadcast_service = None - self.on_startup.append(self.start_ssh_server) + self.user_availability_service_task = None + self.on_startup.append(self.prepare_asyncio) + self.on_startup.append(self.start_user_availability_service) + self.on_startup.append(self.start_ssh_server) self.on_startup.append(self.prepare_database) - - + async def start_user_availability_service(self, app): + app.user_availability_service_task = asyncio.create_task(app.services.socket.user_availability_service()) async def snode_sync(self, app): self.sync_service = asyncio.create_task(snode.sync_service(app)) diff --git a/src/snek/service/socket.py b/src/snek/service/socket.py index eb40234..ecc7bf9 100644 --- a/src/snek/service/socket.py +++ b/src/snek/service/socket.py @@ -1,7 +1,11 @@ from snek.model.user import UserModel from snek.system.service import BaseService from datetime import datetime -import json +import json +import asyncio +import logging +logger = logging.getLogger(__name__) +from snek.system.model import now class SocketService(BaseService): @@ -36,10 +40,28 @@ class SocketService(BaseService): self.users = {} self.subscriptions = {} self.last_update = str(datetime.now()) - + + + async def user_availability_service(self): + logger.info("User availability update service started.") + while True: + logger.info("Updating user availability...") + users_updated = [] + for s in self.sockets: + if not s.is_connected: + continue + if not s.user in users_updated: + s.user["last_ping"] = now() + await self.app.services.user.save(s.user) + users_updated.append(s.user) + logger.info(f"Updated user availability for {len(users_updated)} online users.") + await asyncio.sleep(60) + + async def add(self, ws, user_uid): s = self.Socket(ws, await self.app.services.user.get(uid=user_uid)) self.sockets.add(s) + logger.info(f"Added socket for user {s.user['username']}") if not self.users.get(user_uid): self.users[user_uid] = set() self.users[user_uid].add(s) @@ -62,16 +84,21 @@ class SocketService(BaseService): await self._broadcast(channel_uid, message) async def _broadcast(self, channel_uid, message): + sent = 0 try: async for user_uid in self.services.channel_member.get_user_uids( channel_uid ): await self.send_to_user(user_uid, message) + sent += 1 except Exception as ex: print(ex, flush=True) + logger.info(f"Broadcasted a message to {sent} users.") return True async def delete(self, ws): for s in [sock for sock in self.sockets if sock.ws == ws]: await s.close() + logger.info(f"Removed socket for user {s.user['username']}") self.sockets.remove(s) +