refactor: enhance socket service robustness with locks and error handling

fix: add null checks and safe access utilities to prevent crashes
refactor: restructure socket methods for better concurrency and logging
This commit is contained in:
retoor 2025-12-18 22:04:01 +01:00
parent 70a405b231
commit b710008dbe
6 changed files with 1618 additions and 677 deletions

View File

@ -8,6 +8,14 @@
## Version 1.8.0 - 2025-12-18
The socket service now handles errors more robustly and prevents crashes through improved safety checks. Socket methods support better concurrency and provide enhanced logging for developers.
**Changes:** 4 files, 2279 lines
**Languages:** JavaScript (592 lines), Python (1687 lines)
## Version 1.7.0 - 2025-12-17 ## Version 1.7.0 - 2025-12-17
Fixes socket cleanup in the websocket handler to prevent resource leaks and improve connection stability. Fixes socket cleanup in the websocket handler to prevent resource leaks and improve connection stability.

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "Snek" name = "Snek"
version = "1.7.0" version = "1.8.0"
readme = "README.md" readme = "README.md"
#license = { file = "LICENSE", content-type="text/markdown" } #license = { file = "LICENSE", content-type="text/markdown" }
description = "Snek Chat Application by Molodetz" description = "Snek Chat Application by Molodetz"

View File

@ -1,12 +1,34 @@
# retoor <retoor@molodetz.nl>
import asyncio import asyncio
import logging import logging
from datetime import datetime from datetime import datetime
from snek.model.user import UserModel from snek.model.user import UserModel
from snek.system.service import BaseService from snek.system.service import BaseService
from snek.system.model import now
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from snek.system.model import now
def safe_get(obj, key, default=None):
if obj is None:
return default
try:
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
except Exception:
return default
def safe_str(obj):
if obj is None:
return ""
try:
return str(obj)
except Exception:
return ""
class SocketService(BaseService): class SocketService(BaseService):
@ -15,28 +37,43 @@ class SocketService(BaseService):
self.ws = ws self.ws = ws
self.is_connected = True self.is_connected = True
self.user = user self.user = user
self.user_uid = user["uid"] if user else None self.user_uid = safe_get(user, "uid") if user else None
self.user_color = user["color"] if user else None self.user_color = safe_get(user, "color") if user else None
self._lock = asyncio.Lock()
self.subscribed_channels = set()
async def send_json(self, data): async def send_json(self, data):
if not self.is_connected: if not self.is_connected:
return False return False
try: if not self.ws:
await self.ws.send_json(data)
except Exception:
self.is_connected = False self.is_connected = False
return self.is_connected return False
if data is None:
return False
async with self._lock:
try:
await self.ws.send_json(data)
return True
except ConnectionResetError:
self.is_connected = False
logger.debug("Connection reset during send_json")
except Exception as ex:
self.is_connected = False
logger.debug(f"send_json failed: {safe_str(ex)}")
return False
async def close(self): async def close(self):
if not self.is_connected: if not self.is_connected:
return True return True
async with self._lock:
try: try:
await self.ws.close() if self.ws:
except Exception: await self.ws.close()
pass except Exception as ex:
self.is_connected = False logger.debug(f"Socket close failed: {safe_str(ex)}")
finally:
self.is_connected = False
self.subscribed_channels.clear()
return True return True
def __init__(self, app): def __init__(self, app):
@ -45,124 +82,245 @@ class SocketService(BaseService):
self.users = {} self.users = {}
self.subscriptions = {} self.subscriptions = {}
self.last_update = str(datetime.now()) self.last_update = str(datetime.now())
self._lock = asyncio.Lock()
async def user_availability_service(self): async def user_availability_service(self):
logger.info("User availability update service started.") logger.info("User availability update service started.")
logger.debug("Entering the main loop.")
while True: while True:
logger.info("Updating user availability...") try:
logger.debug("Initializing users_updated list.") users_updated = []
users_updated = [] sockets_copy = list(self.sockets)
logger.debug("Iterating over sockets.") for s in sockets_copy:
for s in self.sockets: try:
logger.debug(f"Checking connection status for socket: {s}.") if not s or not s.is_connected:
if not s.is_connected: continue
logger.debug("Socket is not connected, continuing to next socket.") if not s.user:
continue continue
logger.debug(f"Checking if user {s.user} is already updated.") if s.user in users_updated:
if s.user not in users_updated: continue
logger.debug(f"Updating last_ping for user: {s.user}.") s.user["last_ping"] = now()
s.user["last_ping"] = now() if self.app and hasattr(self.app, "services") and self.app.services:
logger.debug(f"Saving user {s.user} to the database.") await self.app.services.user.save(s.user)
await self.app.services.user.save(s.user) users_updated.append(s.user)
logger.debug(f"Adding user {s.user} to users_updated list.") except Exception as ex:
users_updated.append(s.user) logger.debug(f"Failed to update user availability: {safe_str(ex)}")
logger.info( logger.info(f"Updated user availability for {len(users_updated)} online users.")
f"Updated user availability for {len(users_updated)} online users." except Exception as ex:
) logger.warning(f"User availability service error: {safe_str(ex)}")
logger.debug("Sleeping for 60 seconds before the next update.") try:
await asyncio.sleep(60) await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("User availability service cancelled")
break
async def add(self, ws, user_uid): async def add(self, ws, user_uid):
if not ws:
return None
if not user_uid: if not user_uid:
return None return None
user = await self.app.services.user.get(uid=user_uid) try:
if not user: if not self.app or not hasattr(self.app, "services") or not self.app.services:
logger.warning("Services not available for socket add")
return None
user = await self.app.services.user.get(uid=user_uid)
if not user:
logger.warning(f"User not found for socket add: {user_uid}")
return None
s = self.Socket(ws, user)
async with self._lock:
self.sockets.add(s)
try:
s.user["last_ping"] = now()
await self.app.services.user.save(s.user)
except Exception as ex:
logger.debug(f"Failed to update last_ping: {safe_str(ex)}")
username = safe_get(s.user, "username", "unknown")
logger.info(f"Added socket for user {username}")
is_first_connection = False
async with self._lock:
if user_uid not in self.users:
self.users[user_uid] = set()
is_first_connection = True
elif len(self.users[user_uid]) == 0:
is_first_connection = True
self.users[user_uid].add(s)
if is_first_connection:
nick = safe_get(s.user, "nick") or safe_get(s.user, "username", "")
color = s.user_color
await self._broadcast_presence("arrived", user_uid, nick, color)
return s
except Exception as ex:
logger.warning(f"Failed to add socket: {safe_str(ex)}")
return None return None
s = self.Socket(ws, user)
self.sockets.add(s)
s.user["last_ping"] = now()
await self.app.services.user.save(s.user)
logger.info(f"Added socket for user {s.user['username']}")
is_first_connection = False
if not self.users.get(user_uid):
self.users[user_uid] = set()
is_first_connection = True
elif len(self.users[user_uid]) == 0:
is_first_connection = True
self.users[user_uid].add(s)
if is_first_connection:
await self._broadcast_presence("arrived", user_uid, s.user["nick"] or s.user["username"], s.user_color)
return s
async def subscribe(self, ws, channel_uid, user_uid): async def subscribe(self, ws, channel_uid, user_uid):
if channel_uid not in self.subscriptions: if not ws or not channel_uid or not user_uid:
self.subscriptions[channel_uid] = set() return False
s = self.Socket(ws, await self.app.services.user.get(uid=user_uid)) try:
self.subscriptions[channel_uid].add(s) async with self._lock:
existing_socket = None
for sock in self.sockets:
if sock and sock.ws == ws and sock.user_uid == user_uid:
existing_socket = sock
break
if not existing_socket:
return False
existing_socket.subscribed_channels.add(channel_uid)
if channel_uid not in self.subscriptions:
self.subscriptions[channel_uid] = set()
self.subscriptions[channel_uid].add(user_uid)
return True
except Exception as ex:
logger.warning(f"Failed to subscribe: {safe_str(ex)}")
return False
async def send_to_user(self, user_uid, message): async def send_to_user(self, user_uid, message):
if not user_uid or message is None:
return 0
count = 0 count = 0
for s in list(self.users.get(user_uid, [])): try:
if await s.send_json(message): async with self._lock:
count += 1 user_sockets = list(self.users.get(user_uid, []))
for s in user_sockets:
if not s:
continue
try:
if await s.send_json(message):
count += 1
except Exception as ex:
logger.debug(f"Failed to send to user socket: {safe_str(ex)}")
except Exception as ex:
logger.warning(f"send_to_user failed: {safe_str(ex)}")
return count return count
async def broadcast(self, channel_uid, message): async def broadcast(self, channel_uid, message):
await self._broadcast(channel_uid, message) if not channel_uid or message is None:
return False
return await self._broadcast(channel_uid, message)
async def _broadcast(self, channel_uid, message): async def _broadcast(self, channel_uid, message):
if not channel_uid or message is None:
return False
sent = 0 sent = 0
user_uids_to_send = set()
try: try:
async for user_uid in self.services.channel_member.get_user_uids( if self.services:
channel_uid try:
): async for user_uid in self.services.channel_member.get_user_uids(channel_uid):
sent += await self.send_to_user(user_uid, message) if user_uid:
user_uids_to_send.add(user_uid)
except Exception as ex:
logger.warning(f"Broadcast db query failed: {safe_str(ex)}")
if not user_uids_to_send:
async with self._lock:
if channel_uid in self.subscriptions:
user_uids_to_send = set(self.subscriptions[channel_uid])
for user_uid in user_uids_to_send:
try:
sent += await self.send_to_user(user_uid, message)
except Exception as ex:
logger.debug(f"Failed to send to user {user_uid}: {safe_str(ex)}")
logger.debug(f"Broadcasted a message to {sent} users.")
return True
except Exception as ex: except Exception as ex:
print(ex, flush=True) logger.warning(f"Broadcast failed: {safe_str(ex)}")
logger.info(f"Broadcasted a message to {sent} users.") return False
return True
async def delete(self, ws): async def delete(self, ws):
for s in [sock for sock in self.sockets if sock.ws == ws]: if not ws:
await s.close() return
user_uid = s.user_uid sockets_to_remove = []
user_nick = (s.user["nick"] or s.user["username"]) if s.user else None departures_to_broadcast = []
user_color = s.user_color async with self._lock:
logger.info(f"Removed socket for user {s.user['username'] if s.user else 'unknown'}") sockets_to_remove = [sock for sock in self.sockets if sock and sock.ws == ws]
self.sockets.discard(s) for s in sockets_to_remove:
self.sockets.discard(s)
if user_uid: user_uid = s.user_uid
if user_uid in self.users: if user_uid and user_uid in self.users:
self.users[user_uid].discard(s) self.users[user_uid].discard(s)
if len(self.users[user_uid]) == 0: if len(self.users[user_uid]) == 0:
await self._broadcast_presence("departed", user_uid, user_nick, user_color) del self.users[user_uid]
user_nick = None
try:
if s.user:
user_nick = safe_get(s.user, "nick") or safe_get(s.user, "username")
except Exception:
pass
if user_nick:
departures_to_broadcast.append((user_uid, user_nick, s.user_color))
for channel_uid in list(s.subscribed_channels):
if channel_uid in self.subscriptions:
self.subscriptions[channel_uid].discard(user_uid)
if len(self.subscriptions[channel_uid]) == 0:
del self.subscriptions[channel_uid]
for s in sockets_to_remove:
try:
username = safe_get(s.user, "username", "unknown") if s.user else "unknown"
logger.info(f"Removed socket for user {username}")
await s.close()
except Exception as ex:
logger.warning(f"Socket close failed: {safe_str(ex)}")
for user_uid, user_nick, user_color in departures_to_broadcast:
try:
await self._broadcast_presence("departed", user_uid, user_nick, user_color)
except Exception as ex:
logger.debug(f"Failed to broadcast departure: {safe_str(ex)}")
async def _broadcast_presence(self, event_type, user_uid, user_nick, user_color): async def _broadcast_presence(self, event_type, user_uid, user_nick, user_color):
if not user_uid or not user_nick: if not user_uid or not user_nick:
return return
message = { if not event_type or event_type not in ("arrived", "departed"):
"event": "user_presence", return
"data": { try:
"type": event_type, message = {
"user_uid": user_uid, "event": "user_presence",
"user_nick": user_nick, "data": {
"user_color": user_color, "type": event_type,
"timestamp": datetime.now().isoformat() "user_uid": user_uid,
"user_nick": user_nick,
"user_color": user_color,
"timestamp": datetime.now().isoformat(),
},
} }
} sent_count = 0
sent_count = 0 async with self._lock:
for s in list(self.sockets): sockets_copy = list(self.sockets)
if not s.is_connected: for s in sockets_copy:
continue if not s or not s.is_connected:
if s.user_uid == user_uid: continue
continue if s.user_uid == user_uid:
try: continue
if await s.send_json(message): try:
sent_count += 1 if await s.send_json(message):
except Exception: sent_count += 1
pass except Exception as ex:
logger.info(f"Broadcast presence '{event_type}' for {user_nick} to {sent_count} users") logger.debug(f"Failed to send presence to socket: {safe_str(ex)}")
logger.info(f"Broadcast presence '{event_type}' for {user_nick} to {sent_count} users")
except Exception as ex:
logger.warning(f"Broadcast presence failed: {safe_str(ex)}")
async def get_connected_users(self):
try:
async with self._lock:
return list(self.users.keys())
except Exception:
return []
async def get_user_socket_count(self, user_uid):
if not user_uid:
return 0
try:
async with self._lock:
return len(self.users.get(user_uid, []))
except Exception:
return 0
async def is_user_online(self, user_uid):
if not user_uid:
return False
try:
async with self._lock:
user_sockets = self.users.get(user_uid, set())
return any(s.is_connected for s in user_sockets if s)
except Exception:
return False

View File

@ -1,33 +1,178 @@
// retoor <retoor@molodetz.nl>
export class EventHandler { export class EventHandler {
constructor() { constructor() {
this.subscribers = {}; this.subscribers = {};
this._maxListeners = 100;
this._warnOnMaxListeners = true;
} }
addEventListener(type, handler, { once = false } = {}) { addEventListener(type, handler, options = {}) {
if (!this.subscribers[type]) this.subscribers[type] = []; if (!type || typeof type !== "string") {
if (once) { console.warn("EventHandler: Invalid event type");
const originalHandler = handler; return false;
handler = (...args) => { }
originalHandler(...args); if (!handler || typeof handler !== "function") {
this.removeEventListener(type, handler); console.warn("EventHandler: Invalid handler");
}; return false;
}
try {
const once = options && options.once === true;
if (!this.subscribers[type]) {
this.subscribers[type] = [];
}
if (this._warnOnMaxListeners && this.subscribers[type].length >= this._maxListeners) {
console.warn(`EventHandler: Max listeners (${this._maxListeners}) reached for event "${type}"`);
}
let wrappedHandler = handler;
if (once) {
const originalHandler = handler;
wrappedHandler = (...args) => {
try {
originalHandler(...args);
} catch (e) {
console.error(`EventHandler: Error in once handler for "${type}":`, e);
} finally {
this.removeEventListener(type, wrappedHandler);
}
};
wrappedHandler._original = originalHandler;
}
this.subscribers[type].push(wrappedHandler);
return true;
} catch (e) {
console.error("EventHandler: addEventListener error:", e);
return false;
} }
this.subscribers[type].push(handler);
} }
emit(type, ...data) { emit(type, ...data) {
if (this.subscribers[type]) if (!type || typeof type !== "string") {
this.subscribers[type].forEach((handler) => handler(...data)); return false;
}
try {
const handlers = this.subscribers[type];
if (!handlers || !Array.isArray(handlers) || handlers.length === 0) {
return false;
}
const handlersCopy = [...handlers];
for (const handler of handlersCopy) {
if (typeof handler !== "function") {
continue;
}
try {
handler(...data);
} catch (e) {
console.error(`EventHandler: Error in handler for "${type}":`, e);
}
}
return true;
} catch (e) {
console.error("EventHandler: emit error:", e);
return false;
}
} }
removeEventListener(type, handler) { removeEventListener(type, handler) {
if (!this.subscribers[type]) return; if (!type || typeof type !== "string") {
this.subscribers[type] = this.subscribers[type].filter( return false;
(h) => h !== handler }
); try {
if (!this.subscribers[type]) {
if (this.subscribers[type].length === 0) { return false;
delete this.subscribers[type]; }
if (!handler) {
delete this.subscribers[type];
return true;
}
const originalLength = this.subscribers[type].length;
this.subscribers[type] = this.subscribers[type].filter((h) => {
if (h === handler) return false;
if (h._original && h._original === handler) return false;
return true;
});
if (this.subscribers[type].length === 0) {
delete this.subscribers[type];
}
return this.subscribers[type] ? this.subscribers[type].length < originalLength : true;
} catch (e) {
console.error("EventHandler: removeEventListener error:", e);
return false;
} }
} }
removeAllEventListeners(type) {
try {
if (type) {
if (this.subscribers[type]) {
delete this.subscribers[type];
return true;
}
return false;
}
this.subscribers = {};
return true;
} catch (e) {
console.error("EventHandler: removeAllEventListeners error:", e);
return false;
}
}
hasEventListener(type, handler) {
if (!type || typeof type !== "string") {
return false;
}
try {
if (!this.subscribers[type]) {
return false;
}
if (!handler) {
return this.subscribers[type].length > 0;
}
return this.subscribers[type].some((h) => {
if (h === handler) return true;
if (h._original && h._original === handler) return true;
return false;
});
} catch (e) {
return false;
}
}
getEventListenerCount(type) {
try {
if (type) {
return this.subscribers[type] ? this.subscribers[type].length : 0;
}
let count = 0;
for (const key in this.subscribers) {
if (Object.prototype.hasOwnProperty.call(this.subscribers, key)) {
count += this.subscribers[key].length;
}
}
return count;
} catch (e) {
return 0;
}
}
getEventTypes() {
try {
return Object.keys(this.subscribers);
} catch (e) {
return [];
}
}
once(type, handler) {
return this.addEventListener(type, handler, { once: true });
}
off(type, handler) {
return this.removeEventListener(type, handler);
}
on(type, handler) {
return this.addEventListener(type, handler);
}
} }

View File

@ -1,110 +1,270 @@
// retoor <retoor@molodetz.nl>
import { EventHandler } from "./event-handler.js"; import { EventHandler } from "./event-handler.js";
function createPromiseWithResolvers() {
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject, resolved: false };
}
export class Socket extends EventHandler { export class Socket extends EventHandler {
/** url = null;
* @type {URL}
*/
url;
/**
* @type {WebSocket|null}
*/
ws = null; ws = null;
/**
* @type {null|PromiseWithResolvers<Socket>&{resolved?:boolean}}
*/
connection = null; connection = null;
shouldReconnect = true; shouldReconnect = true;
_debug = false; _debug = false;
_reconnectAttempts = 0;
_maxReconnectAttempts = 50;
_reconnectDelay = 4000;
_pendingCalls = new Map();
_callTimeout = 30000;
_isDestroyed = false;
get isConnected() { get isConnected() {
return this.ws && this.ws.readyState === WebSocket.OPEN; try {
return this.ws !== null && this.ws.readyState === WebSocket.OPEN;
} catch (e) {
return false;
}
} }
get isConnecting() { get isConnecting() {
return this.ws && this.ws.readyState === WebSocket.CONNECTING; try {
return this.ws !== null && this.ws.readyState === WebSocket.CONNECTING;
} catch (e) {
return false;
}
} }
constructor() { constructor() {
super(); super();
try {
this.url = new URL("/rpc.ws", window.location.origin); this.url = new URL("/rpc.ws", window.location.origin);
this.url.protocol = this.url.protocol.replace("http", "ws"); this.url.protocol = this.url.protocol.replace("http", "ws");
this.connect();
this.connect(); } catch (e) {
console.error("Socket initialization failed:", e);
}
} }
connect() { connect() {
if (this.ws) { if (this._isDestroyed) {
return this.connection.promise; return Promise.reject(new Error("Socket destroyed"));
} }
if (this.ws && (this.isConnected || this.isConnecting)) {
if (!this.connection || this.connection.resolved) { return this.connection ? this.connection.promise : Promise.resolve(this);
this.connection = Promise.withResolvers();
} }
try {
this.ws = new WebSocket(this.url); this._cleanup();
this.ws.addEventListener("open", () => { if (!this.connection || this.connection.resolved) {
this.connection.resolved = true; this.connection = createPromiseWithResolvers();
this.connection.resolve(this);
this.emit("connected");
});
this.ws.addEventListener("close", () => {
console.log("Connection closed");
this.disconnect();
});
this.ws.addEventListener("error", (e) => {
console.error("Connection error", e);
this.disconnect();
});
this.ws.addEventListener("message", (e) => {
if (e.data instanceof Blob || e.data instanceof ArrayBuffer) {
console.error("Binary data not supported");
} else {
try {
this.onData(JSON.parse(e.data));
} catch (e) {
console.error("Failed to parse message", e);
}
} }
}); if (!this.url) {
this.connection.reject(new Error("URL not initialized"));
return this.connection.promise;
}
this.ws = new WebSocket(this.url);
this.ws.addEventListener("open", () => {
try {
this._reconnectAttempts = 0;
if (this.connection && !this.connection.resolved) {
this.connection.resolved = true;
this.connection.resolve(this);
}
this.emit("connected");
} catch (e) {
console.error("Open handler error:", e);
}
});
this.ws.addEventListener("close", (event) => {
try {
const reason = event.reason || "Connection closed";
console.log("Connection closed:", reason);
this._handleDisconnect();
} catch (e) {
console.error("Close handler error:", e);
}
});
this.ws.addEventListener("error", (e) => {
try {
console.error("Connection error:", e);
this._handleDisconnect();
} catch (ex) {
console.error("Error handler error:", ex);
}
});
this.ws.addEventListener("message", (e) => {
this._handleMessage(e);
});
return this.connection.promise;
} catch (e) {
console.error("Connect failed:", e);
return Promise.reject(e);
}
}
_handleMessage(e) {
if (!e || !e.data) {
return;
}
try {
if (e.data instanceof Blob || e.data instanceof ArrayBuffer) {
console.warn("Binary data not supported");
return;
}
let data;
try {
data = JSON.parse(e.data);
} catch (parseError) {
console.error("Failed to parse message:", parseError);
return;
}
if (data) {
this.onData(data);
}
} catch (error) {
console.error("Message handling error:", error);
}
} }
onData(data) { onData(data) {
if (data.success !== undefined && !data.success) { if (!data || typeof data !== "object") {
console.error(data); return;
} }
if (data.callId) { try {
this.emit(data.callId, data.data); if (data.success !== undefined && !data.success) {
console.error("RPC error:", data);
}
if (data.callId) {
try {
const response = {
data: data.data,
success: data.success !== false,
error: data.success === false ? (data.data && data.data.error ? data.data.error : "Unknown error") : null
};
this.emit(data.callId, response);
if (this._pendingCalls.has(data.callId)) {
clearTimeout(this._pendingCalls.get(data.callId));
this._pendingCalls.delete(data.callId);
}
} catch (e) {
console.error("CallId emit error:", e);
}
}
if (data.channel_uid) {
try {
this.emit(data.channel_uid, data.data);
if (!data.event) {
this.emit("channel-message", data);
}
} catch (e) {
console.error("Channel emit error:", e);
}
}
try {
this.emit("data", data.data);
} catch (e) {
console.error("Data emit error:", e);
}
if (data.event) {
try {
if (this._debug) {
console.info([data.event, data.data]);
}
this.emit(data.event, data.data);
} catch (e) {
console.error("Event emit error:", e);
}
}
} catch (error) {
console.error("onData error:", error);
} }
if (data.channel_uid) { }
this.emit(data.channel_uid, data.data);
if (!data["event"]) this.emit("channel-message", data); _cleanup() {
try {
if (this.ws) {
try {
this.ws.close();
} catch (e) {
// ignore
}
this.ws = null;
}
} catch (e) {
console.error("Cleanup error:", e);
} }
this.emit("data", data.data); }
if (data["event"]) {
console.info([data.event,data.data]) _handleDisconnect() {
this.emit(data.event, data.data); this._cleanup();
this._rejectPendingCalls("Connection lost");
if (this.connection && !this.connection.resolved) {
this.connection.resolved = true;
this.connection.reject(new Error("Connection failed"));
}
if (this.shouldReconnect && !this._isDestroyed) {
this._reconnectAttempts++;
if (this._reconnectAttempts <= this._maxReconnectAttempts) {
const delay = Math.min(
this._reconnectDelay * Math.pow(1.5, Math.min(this._reconnectAttempts - 1, 10)),
30000
);
setTimeout(() => {
if (!this._isDestroyed && this.shouldReconnect) {
console.log(`Reconnecting (attempt ${this._reconnectAttempts}/${this._maxReconnectAttempts})`);
this.emit("reconnecting");
this.connect();
}
}, delay);
} else {
console.error("Max reconnection attempts reached");
this.emit("reconnect_failed");
}
}
}
_rejectPendingCalls(reason) {
try {
for (const [callId, timeoutId] of this._pendingCalls) {
try {
clearTimeout(timeoutId);
this.emit(callId, { data: null, error: reason, success: false });
} catch (e) {
// ignore
}
}
this._pendingCalls.clear();
} catch (e) {
console.error("Failed to reject pending calls:", e);
} }
} }
disconnect() { disconnect() {
this.ws?.close(); this.shouldReconnect = false;
this.ws = null; this._cleanup();
this._rejectPendingCalls("Disconnected");
}
if (this.shouldReconnect) destroy() {
setTimeout(() => { this._isDestroyed = true;
console.log("Reconnecting"); this.disconnect();
this.emit("reconnecting"); this.removeAllEventListeners();
return this.connect();
}, 4000);
} }
_camelToSnake(str) { _camelToSnake(str) {
return str.replace(/([a-z])([A-Z])/g, "$1_$2").toLowerCase(); if (!str || typeof str !== "string") {
return "";
}
try {
return str.replace(/([a-z])([A-Z])/g, "$1_$2").toLowerCase();
} catch (e) {
return str;
}
} }
get client() { get client() {
@ -113,39 +273,128 @@ export class Socket extends EventHandler {
{}, {},
{ {
get(_, prop) { get(_, prop) {
if (!prop || typeof prop !== "string") {
return () => Promise.reject(new Error("Invalid method name"));
}
return (...args) => { return (...args) => {
const functionName = me._camelToSnake(prop); try {
if(me._debug){ const functionName = me._camelToSnake(prop);
const call = {} if (me._debug) {
call[functionName] = args const call = {};
console.debug(call) call[functionName] = args;
console.debug(call);
} }
return me.call(functionName, ...args); return me.call(functionName, ...args);
} catch (e) {
console.error("Client call error:", e);
return Promise.reject(e);
}
}; };
}, },
}, }
); );
} }
generateCallId() { generateCallId() {
return self.crypto.randomUUID(); try {
if (typeof crypto !== "undefined" && crypto.randomUUID) {
return crypto.randomUUID();
}
return "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(/[xy]/g, (c) => {
const r = (Math.random() * 16) | 0;
const v = c === "x" ? r : (r & 0x3) | 0x8;
return v.toString(16);
});
} catch (e) {
return Date.now().toString(36) + Math.random().toString(36).substring(2);
}
} }
async sendJson(data) { async sendJson(data) {
await this.connect().then((api) => { if (this._isDestroyed) {
api.ws.send(JSON.stringify(data)); throw new Error("Socket destroyed");
}); }
if (!data) {
throw new Error("No data to send");
}
try {
await this.connect();
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("WebSocket not open");
}
const jsonStr = JSON.stringify(data);
this.ws.send(jsonStr);
} catch (e) {
console.error("sendJson error:", e);
throw e;
}
} }
async call(method, ...args) { async call(method, ...args) {
const call = { if (this._isDestroyed) {
callId: this.generateCallId(), return Promise.reject(new Error("Socket destroyed"));
}
if (!method || typeof method !== "string") {
return Promise.reject(new Error("Invalid method name"));
}
const callId = this.generateCallId();
const callData = {
callId,
method, method,
args, args: args || [],
}; };
return new Promise((resolve) => { return new Promise((resolve, reject) => {
this.addEventListener(call.callId, (data) => resolve(data), { once: true}); const timeoutId = setTimeout(() => {
this.sendJson(call); try {
this._pendingCalls.delete(callId);
this.removeEventListener(callId, handler);
reject(new Error(`RPC call timeout: ${method}`));
} catch (e) {
reject(e);
}
}, this._callTimeout);
this._pendingCalls.set(callId, timeoutId);
const handler = (response) => {
try {
clearTimeout(timeoutId);
this._pendingCalls.delete(callId);
if (response && !response.success && response.error) {
reject(new Error(response.error));
} else {
resolve(response ? response.data : null);
}
} catch (e) {
reject(e);
}
};
try {
this.addEventListener(callId, handler, { once: true });
this.sendJson(callData).catch((e) => {
clearTimeout(timeoutId);
this._pendingCalls.delete(callId);
this.removeEventListener(callId, handler);
reject(e);
});
} catch (e) {
clearTimeout(timeoutId);
this._pendingCalls.delete(callId);
reject(e);
}
}); });
} }
async callWithRetry(method, args = [], maxRetries = 3) {
let lastError;
for (let i = 0; i < maxRetries; i++) {
try {
return await this.call(method, ...args);
} catch (e) {
lastError = e;
if (i < maxRetries - 1) {
await new Promise((r) => setTimeout(r, 1000 * (i + 1)));
}
}
}
throw lastError;
}
} }

File diff suppressed because it is too large Load Diff