diff --git a/src/snek/research/serpentarium.py b/src/snek/research/serpentarium.py index e425a23..481e218 100644 --- a/src/snek/research/serpentarium.py +++ b/src/snek/research/serpentarium.py @@ -1,41 +1,6 @@ - import json -import asyncio -import aiohttp -from aiohttp import web -import dataset -import dataset.util -import traceback -import socket -import base64 -import uuid - class DatasetMethod: - def __init__(self, dt, name): - self.dt = dt - self.name = name - - def __call__(self, *args, **kwargs): - return self.dt.ds.call( - self.dt.name, - self.name, - *args, - **kwargs - ) - - class DatasetTable: - - def __init__(self, ds, name): - self.ds = ds - self.name = name - - def __getattr__(self, name): - return DatasetMethod(self, name) - - - - class WebSocketClient2: def __init__(self, uri): self.uri = uri @@ -47,196 +12,21 @@ class WebSocketClient2: if self.loop.is_running(): # Schedule connect in the existing loop self._connect_future = asyncio.run_coroutine_threadsafe(self._connect(), self.loop) - else: - # If loop isn't running, connect synchronously - self.loop.run_until_complete(self._connect()) - - async def _connect(self): - self.websocket = await websockets.connect(self.uri) - # Start listening for messages - asyncio.create_task(self._receive_loop()) - - async def _receive_loop(self): - try: - async for message in self.websocket: - await self.receive_queue.put(message) - except Exception: - pass # Handle exceptions as needed - def send(self, message: str): - if self.loop.is_running(): - # Schedule send in the existing loop - asyncio.run_coroutine_threadsafe(self.websocket.send(message), self.loop) - else: - # If loop isn't running, run directly - self.loop.run_until_complete(self.websocket.send(message)) - - def receive(self): - # Wait for a message synchronously - future = asyncio.run_coroutine_threadsafe(self.receive_queue.get(), self.loop) - return future.result() - def close(self): - if self.websocket: - if self.loop.is_running(): - asyncio.run_coroutine_threadsafe(self.websocket.close(), self.loop) - else: - self.loop.run_until_complete(self.websocket.close()) - - -import websockets - class DatasetWrapper(object): - def __init__(self): - self.ws = WebSocketClient() - - def begin(self): - self.call(None, 'begin') - def commit(self): - self.call(None, 'commit') - - def __getitem__(self, name): - return DatasetTable(self, name) - def query(self, *args, **kwargs): - return self.call(None, 'query', *args, **kwargs) - - def call(self, table, method, *args, **kwargs): - payload = {"table": table, "method": method, "args": args, "kwargs": kwargs,"call_uid":None} - #if method in ['find','find_one']: - payload["call_uid"] = str(uuid.uuid4()) - self.ws.write(json.dumps(payload)) - if payload["call_uid"]: - response = self.ws.read() - return json.loads(response)['result'] - return True - - - class DatasetWebSocketView: def __init__(self): self.ws = None self.db = dataset.connect('sqlite:///snek.db') - self.setattr(self, "db", self.get) - self.setattr(self, "db", self.set) - ) - super() - + setattr(self, "db", self.get) + setattr(self, "db", self.set) def format_result(self, result): - try: - return dict(result) - except: - pass - try: - return [dict(row) for row in result] - except: - pass - return result - async def send_str(self, msg): - return await self.ws.send_str(msg) - def get(self, key): - returnl loads(dict(self.db['_kv'].get(key=key)['value'])) - def set(self, key, value): - return self.db['_kv'].upsert({'key': key, 'value': json.dumps(value)}, ['key']) - - - - async def handle(self, request): - ws = web.WebSocketResponse() - await ws.prepare(request) - self.ws = ws - - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - try: - data = json.loads(msg.data) - call_uid = data.get("call_uid") - method = data.get("method") - table_name = data.get("table") - args = data.get("args", {}) - kwargs = data.get("kwargs", {}) - - - function = getattr(self.db, method, None) - if table_name: - function = getattr(self.db[table_name], method, None) - - print(method, table_name, args, kwargs,flush=True) - - if function: - response = {} - try: - result = function(*args, **kwargs) - print(result) - response['result'] = self.format_result(result) - response["call_uid"] = call_uid - response["success"] = True - except Exception as e: - response["call_uid"] = call_uid - response["success"] = False - response["error"] = str(e) - response["traceback"] = traceback.format_exc() - - if call_uid: - await self.send_str(json.dumps(response,default=str)) - else: - await self.send_str(json.dumps({"status": "error", "error":"Method not found.","call_uid": call_uid})) - except Exception as e: - await self.send_str(json.dumps({"success": False,"call_uid": call_uid, "error": str(e), "error": str(e), "traceback": traceback.format_exc()},default=str)) - elif msg.type == aiohttp.WSMsgType.ERROR: - print('ws connection closed with exception %s' % ws.exception()) - - return ws - - - - - -app = web.Application() -view = DatasetWebSocketView() -app.router.add_get('/db', view.handle) - async def run_server(): - - - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, 'localhost', 3131) - await site.start() - - print("Server started at http://localhost:8080") - await asyncio.Event().wait() - -async def client(): - print("x") - d = DatasetWrapper() - print("y") - - for x in range(100): - for x in range(100): - if d['test'].insert({"name": "test", "number":x}): - print(".",end="",flush=True) - print("") - print(d['test'].find_one(name="test", order_by="-number")) - - print("DONE") - - - -import time -async def main(): - await run_server() - -import sys - -if __name__ == '__main__': - if sys.argv[1] == 'server': - asyncio.run(main()) - if sys.argv[1] == 'client': - asyncio.run(client()) diff --git a/src/snek/sync.py b/src/snek/sync.py index fb2a9af..c1ff46d 100644 --- a/src/snek/sync.py +++ b/src/snek/sync.py @@ -1,135 +1,22 @@ - - - - class DatasetWebSocketView: def __init__(self): self.ws = None self.db = dataset.connect('sqlite:///snek.db') - self.setattr(self, "db", self.get) - self.setattr(self, "db", self.set) - ) + setattr(self, "db", self.get) + setattr(self, "db", self.set) super() def format_result(self, result): - try: - return dict(result) - except: - pass - try: - return [dict(row) for row in result] - except: - pass - return result - async def send_str(self, msg): - return await self.ws.send_str(msg) - def get(self, key): - returnl loads(dict(self.db['_kv'].get(key=key)['value'])) - def set(self, key, value): - return self.db['_kv'].upsert({'key': key, 'value': json.dumps(value)}, ['key']) - - - - async def handle(self, request): - ws = web.WebSocketResponse() - await ws.prepare(request) - self.ws = ws - - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - try: - data = json.loads(msg.data) - call_uid = data.get("call_uid") - method = data.get("method") - table_name = data.get("table") - args = data.get("args", {}) - kwargs = data.get("kwargs", {}) - - - function = getattr(self.db, method, None) - if table_name: - function = getattr(self.db[table_name], method, None) - - print(method, table_name, args, kwargs,flush=True) - - if function: - response = {} - try: - result = function(*args, **kwargs) - print(result) - response['result'] = self.format_result(result) - response["call_uid"] = call_uid - response["success"] = True - except Exception as e: - response["call_uid"] = call_uid - response["success"] = False - response["error"] = str(e) - response["traceback"] = traceback.format_exc() - - if call_uid: - await self.send_str(json.dumps(response,default=str)) - else: - await self.send_str(json.dumps({"status": "error", "error":"Method not found.","call_uid": call_uid})) - except Exception as e: - await self.send_str(json.dumps({"success": False,"call_uid": call_uid, "error": str(e), "error": str(e), "traceback": traceback.format_exc()},default=str)) - elif msg.type == aiohttp.WSMsgType.ERROR: - print('ws connection closed with exception %s' % ws.exception()) - - return ws - class BroadCastSocketView: def __init__(self): self.ws = None - super() def format_result(self, result): - try: - return dict(result) - except: - pass - try: - return [dict(row) for row in result] - except: - pass - return result - async def send_str(self, msg): - return await self.ws.send_str(msg) - def get(self, key): - returnl loads(dict(self.db['_kv'].get(key=key)['value'])) - def set(self, key, value): - return self.db['_kv'].upsert({'key': key, 'value': json.dumps(value)}, ['key']) - - - - async def handle(self, request): - ws = web.WebSocketResponse() - await ws.prepare(request) - self.ws = ws - app = request.app - app['broadcast_clients'].append(ws) - - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - print(msg.data) - for client in app['broadcast_clients'] if not client == ws: - await client.send_str(msg.data) - elif msg.type == aiohttp.WSMsgType.ERROR: - print('ws connection closed with exception %s' % ws.exception()) - app['broadcast_clients'].remove(ws) - return ws - - -app = web.Application() -view = DatasetWebSocketView() -app['broadcast_clients'] = [] -app.router.add_get('/db', view.handle) -app.router.add_get('/broadcast', sync_view.handle) -