|
from snek.model.user import UserModel
|
|
|
|
|
|
from snek.system.service import BaseService
|
|
|
|
|
|
class SocketService(BaseService):
|
|
|
|
class Socket:
|
|
def __init__(self, ws, user: UserModel):
|
|
self.ws = ws
|
|
self.is_connected = True
|
|
self.user = user
|
|
|
|
async def send_json(self, data):
|
|
if not self.is_connected:
|
|
return False
|
|
try:
|
|
await self.ws.send_json(data)
|
|
except Exception as ex:
|
|
print(ex,flush=True)
|
|
self.is_connected = False
|
|
return True
|
|
|
|
async def close(self):
|
|
if not self.is_connected:
|
|
return True
|
|
|
|
await self.ws.close()
|
|
self.is_connected = False
|
|
|
|
return True
|
|
|
|
|
|
def __init__(self, app):
|
|
super().__init__(app)
|
|
self.sockets = set()
|
|
self.users = {}
|
|
self.subscriptions = {}
|
|
|
|
async def add(self, ws, user_uid):
|
|
s = self.Socket(ws, await self.app.services.user.get(uid=user_uid))
|
|
self.sockets.add(s)
|
|
if not self.users.get(user_uid):
|
|
self.users[user_uid] = set()
|
|
self.users[user_uid].add(s)
|
|
|
|
async def subscribe(self, ws,channel_uid, user_uid):
|
|
return
|
|
if not channel_uid in self.subscriptions:
|
|
self.subscriptions[channel_uid] = set()
|
|
s = self.Socket(ws,await self.app.services.user.get(uid=user_uid))
|
|
self.subscriptions[channel_uid].add(s)
|
|
|
|
async def send_to_user(self, user_uid, message):
|
|
count = 0
|
|
for s in self.users.get(user_uid,[]):
|
|
if await s.send_json(message):
|
|
count += 1
|
|
return count
|
|
|
|
async def broadcast(self, channel_uid, message):
|
|
count = 0
|
|
async for channel_member in self.app.services.channel_member.find(channel_uid=channel_uid):
|
|
await self.send_to_user(channel_member["user_uid"],message)
|
|
return True
|
|
|
|
async def delete(self, ws):
|
|
for s in [sock for sock in self.sockets if sock.ws == ws]:
|
|
await s.close()
|
|
self.sockets.remove(s)
|
|
|
|
|