This commit is contained in:
retoor 2025-05-16 00:04:19 +02:00
parent 79c39828f0
commit c5b55399a1
2 changed files with 35 additions and 5 deletions

View File

@ -105,12 +105,15 @@ class Application(BaseApplication):
self.services = get_services(app=self) self.services = get_services(app=self)
self.mappers = get_mappers(app=self) self.mappers = get_mappers(app=self)
self.broadcast_service = None self.broadcast_service = None
self.on_startup.append(self.start_ssh_server) self.user_availability_service_task = None
self.on_startup.append(self.prepare_asyncio) self.on_startup.append(self.prepare_asyncio)
self.on_startup.append(self.start_user_availability_service)
self.on_startup.append(self.start_ssh_server)
self.on_startup.append(self.prepare_database) self.on_startup.append(self.prepare_database)
async def start_user_availability_service(self, app):
app.user_availability_service_task = asyncio.create_task(app.services.socket.user_availability_service())
async def snode_sync(self, app): async def snode_sync(self, app):
self.sync_service = asyncio.create_task(snode.sync_service(app)) self.sync_service = asyncio.create_task(snode.sync_service(app))

View File

@ -1,7 +1,11 @@
from snek.model.user import UserModel from snek.model.user import UserModel
from snek.system.service import BaseService from snek.system.service import BaseService
from datetime import datetime from datetime import datetime
import json import json
import asyncio
import logging
logger = logging.getLogger(__name__)
from snek.system.model import now
class SocketService(BaseService): class SocketService(BaseService):
@ -36,10 +40,28 @@ class SocketService(BaseService):
self.users = {} self.users = {}
self.subscriptions = {} self.subscriptions = {}
self.last_update = str(datetime.now()) self.last_update = str(datetime.now())
async def user_availability_service(self):
logger.info("User availability update service started.")
while True:
logger.info("Updating user availability...")
users_updated = []
for s in self.sockets:
if not s.is_connected:
continue
if not s.user in users_updated:
s.user["last_ping"] = now()
await self.app.services.user.save(s.user)
users_updated.append(s.user)
logger.info(f"Updated user availability for {len(users_updated)} online users.")
await asyncio.sleep(60)
async def add(self, ws, user_uid): async def add(self, ws, user_uid):
s = self.Socket(ws, await self.app.services.user.get(uid=user_uid)) s = self.Socket(ws, await self.app.services.user.get(uid=user_uid))
self.sockets.add(s) self.sockets.add(s)
logger.info(f"Added socket for user {s.user['username']}")
if not self.users.get(user_uid): if not self.users.get(user_uid):
self.users[user_uid] = set() self.users[user_uid] = set()
self.users[user_uid].add(s) self.users[user_uid].add(s)
@ -62,16 +84,21 @@ class SocketService(BaseService):
await self._broadcast(channel_uid, message) await self._broadcast(channel_uid, message)
async def _broadcast(self, channel_uid, message): async def _broadcast(self, channel_uid, message):
sent = 0
try: try:
async for user_uid in self.services.channel_member.get_user_uids( async for user_uid in self.services.channel_member.get_user_uids(
channel_uid channel_uid
): ):
await self.send_to_user(user_uid, message) await self.send_to_user(user_uid, message)
sent += 1
except Exception as ex: except Exception as ex:
print(ex, flush=True) print(ex, flush=True)
logger.info(f"Broadcasted a message to {sent} users.")
return True return True
async def delete(self, ws): async def delete(self, ws):
for s in [sock for sock in self.sockets if sock.ws == ws]: for s in [sock for sock in self.sockets if sock.ws == ws]:
await s.close() await s.close()
logger.info(f"Removed socket for user {s.user['username']}")
self.sockets.remove(s) self.sockets.remove(s)