diff --git a/src/snek/app.py b/src/snek/app.py index 9822d2d..09df0bd 100644 --- a/src/snek/app.py +++ b/src/snek/app.py @@ -5,6 +5,7 @@ import ssl import uuid import signal from datetime import datetime +from contextlib import asynccontextmanager from snek import snode from snek.view.threads import ThreadsView @@ -230,6 +231,7 @@ class Application(BaseApplication): except Exception as ex: print(ex) self.db.commit() + async def prepare_database(self, app): self.db.query("PRAGMA journal_mode=WAL") @@ -452,7 +454,28 @@ class Application(BaseApplication): template_paths.append(self.template_path) return FileSystemLoader(template_paths) - + + @asynccontextmanager + async def no_save(self): + stats = { + 'count': 0 + } + async def patched_save(*args, **kwargs): + await self.cache.set(args[0]["uid"], args[0]) + stats['count'] = stats['count'] + 1 + print(f"save is ignored {stats['count']} times") + return args[0] + save_original = app.services.channel_message.mapper.save + self.services.channel_message.mapper.save = patched_save + raised_exception = None + try: + yield + except Exception as ex: + raised_exception = ex + finally: + self.services.channel_message.mapper.save = save_original + if raised_exception: + raise raised_exception app = Application(db_path="sqlite:///snek.db") diff --git a/src/snek/system/cache.py b/src/snek/system/cache.py index 8f8cdc3..19b8ebf 100644 --- a/src/snek/system/cache.py +++ b/src/snek/system/cache.py @@ -14,7 +14,7 @@ class Cache: self.cache = {} self.max_items = max_items self.stats = {} - self.enabled = False + self.enabled = True self.lru = [] self.version = ((42 + 420 + 1984 + 1990 + 10 + 6 + 71 + 3004 + 7245) ^ 1337) + 4 diff --git a/src/snek/system/mapper.py b/src/snek/system/mapper.py index aa4af18..22f958e 100644 --- a/src/snek/system/mapper.py +++ b/src/snek/system/mapper.py @@ -28,7 +28,13 @@ class BaseMapper: use_semaphore = kwargs.pop("use_semaphore", False) if use_semaphore: async with self.semaphore: - return func(*args, **kwargs) + database_exception = None + for x in range(20): + try: + return func(*args, **kwargs) + except Exception as ex: + database_exception = ex + raise database_exception return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs)) async def new(self): diff --git a/src/snek/system/service.py b/src/snek/system/service.py index bf0c9d6..41c87a4 100644 --- a/src/snek/system/service.py +++ b/src/snek/system/service.py @@ -40,11 +40,11 @@ class BaseService: yield record async def get(self, uid=None, **kwargs): + kwargs["deleted_at"] = None if uid: - if not kwargs: - result = await self.cache.get(uid) - if False and result and result.__class__ == self.mapper.model_class: - return result + result = await self.cache.get(uid) + if result and result.__class__ == self.mapper.model_class: + return result kwargs["uid"] = uid result = await self.mapper.get(**kwargs) @@ -52,7 +52,7 @@ class BaseService: await self.cache.set(result["uid"], result) return result - async def save(self, model: UserModel): + async def save(self, model): # if model.is_valid: You Know why not if await self.mapper.save(model): await self.cache.set(model["uid"], model) diff --git a/src/snek/view/rpc.py b/src/snek/view/rpc.py index 29a4f65..5edc494 100644 --- a/src/snek/view/rpc.py +++ b/src/snek/view/rpc.py @@ -308,40 +308,40 @@ class RPCView(BaseView): return True async def update_message_text(self, message_uid, text): - self._require_login() - message = await self.services.channel_message.get(message_uid) - if message["user_uid"] != self.user_uid: - raise Exception("Not allowed") + async with self.app.no_save(): + self._require_login() + message = await self.services.channel_message.get(message_uid) + if message["user_uid"] != self.user_uid: + raise Exception("Not allowed") - if message.get_seconds_since_last_update() > 3: - await self.finalize_message(message["uid"]) - return { - "error": "Message too old", - "seconds_since_last_update": message.get_seconds_since_last_update(), - "success": False, - } + if message.get_seconds_since_last_update() > 5: + return { + "error": "Message too old", + "seconds_since_last_update": message.get_seconds_since_last_update(), + "success": False, + } - message["message"] = text - if not text: - message["deleted_at"] = now() - else: - message["deleted_at"] = None + message["message"] = text + if not text: + message["deleted_at"] = now() + else: + message["deleted_at"] = None - await self.services.channel_message.save(message) - data = message.record - data["text"] = message["message"] - data["message_uid"] = message_uid + await self.services.channel_message.save(message) + data = message.record + data["text"] = message["message"] + data["message_uid"] = message_uid - await self.services.socket.broadcast( - message["channel_uid"], - { - "channel_uid": message["channel_uid"], - "event": "update_message_text", - "data": message.record, - }, - ) + await self.services.socket.broadcast( + message["channel_uid"], + { + "channel_uid": message["channel_uid"], + "event": "update_message_text", + "data": message.record, + }, + ) - return {"success": True} + return {"success": True} async def send_message(self, channel_uid, message, is_final=True): self._require_login()