From 75f12c1971e23ea7b3f63a88eeb2426bcfb70e03 Mon Sep 17 00:00:00 2001 From: retoor Date: Sat, 14 Jun 2025 08:24:48 +0200 Subject: [PATCH] We did do the cool stuff. --- src/snek/model/channel.py | 6 +++++- src/snek/service/channel_message.py | 16 ++++++++++----- src/snek/service/user.py | 1 + src/snek/static/file-upload-grid.js | 2 +- src/snek/system/mapper.py | 31 +++++++++++++++++++++-------- src/snek/templates/app.html | 1 + src/snek/view/rpc.py | 9 ++++++++- src/snek/view/web.py | 3 ++- 8 files changed, 52 insertions(+), 17 deletions(-) diff --git a/src/snek/model/channel.py b/src/snek/model/channel.py index 0a90c39..6e32335 100644 --- a/src/snek/model/channel.py +++ b/src/snek/model/channel.py @@ -11,11 +11,15 @@ class ChannelModel(BaseModel): is_listed = ModelField(name="is_listed", required=True, kind=bool, value=True) index = ModelField(name="index", required=True, kind=int, value=1000) last_message_on = ModelField(name="last_message_on", required=False, kind=str) + history_start = ModelField(name="history_start", required=False, kind=str) async def get_last_message(self) -> ChannelMessageModel: + history_start_filter = "" + if self["history_start"]: + history_start_filter = f" AND created_at > '{self['history_start']}' " try: async for model in self.app.services.channel_message.query( - "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT 1", + "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1", {"channel_uid": self["uid"]}, ): diff --git a/src/snek/service/channel_message.py b/src/snek/service/channel_message.py index bfc5954..c27571b 100644 --- a/src/snek/service/channel_message.py +++ b/src/snek/service/channel_message.py @@ -88,12 +88,18 @@ class ChannelMessageService(BaseService): return await super().save(model) async def offset(self, channel_uid, page=0, timestamp=None, page_size=30): + channel = await self.services.channel.get(uid=channel_uid) + if not channel: + return [] + history_start_filter = "" + if channel["history_start"]: + history_start_filter = f" AND created_at > '{channel['history_start']}'" results = [] offset = page * page_size try: if timestamp: async for model in self.query( - "SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size OFFSET :offset", + f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset", { "channel_uid": channel_uid, "page_size": page_size, @@ -104,7 +110,7 @@ class ChannelMessageService(BaseService): results.append(model) elif page > 0: async for model in self.query( - "SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size", + f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size", { "channel_uid": channel_uid, "page_size": page_size, @@ -115,7 +121,7 @@ class ChannelMessageService(BaseService): results.append(model) else: async for model in self.query( - "SELECT * FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT :page_size OFFSET :offset", + f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset", { "channel_uid": channel_uid, "page_size": page_size, @@ -124,7 +130,7 @@ class ChannelMessageService(BaseService): ): results.append(model) - except: - pass + except Exception as ex: + print(ex) results.sort(key=lambda x: x["created_at"]) return results diff --git a/src/snek/service/user.py b/src/snek/service/user.py index 34dc468..09b2a09 100644 --- a/src/snek/service/user.py +++ b/src/snek/service/user.py @@ -12,6 +12,7 @@ class UserService(BaseService): async def search(self, query, **kwargs): query = query.strip().lower() + kwarggs["deleted_at"] = None if not query: return [] results = [] diff --git a/src/snek/static/file-upload-grid.js b/src/snek/static/file-upload-grid.js index 0a623f6..32669a7 100644 --- a/src/snek/static/file-upload-grid.js +++ b/src/snek/static/file-upload-grid.js @@ -149,7 +149,7 @@ class FileUploadGrid extends NjetComponent { }; ws.onmessage = (event) => { - cconsole.info(event.data) + console.info(event.data) const data = JSON.parse(event.data); if (data.type === 'progress') { diff --git a/src/snek/system/mapper.py b/src/snek/system/mapper.py index fef7784..aa4af18 100644 --- a/src/snek/system/mapper.py +++ b/src/snek/system/mapper.py @@ -25,9 +25,11 @@ class BaseMapper: return asyncio.get_event_loop() async def run_in_executor(self, func, *args, **kwargs): - async with self.semaphore: - return func(*args, **kwargs) - # return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs)) + use_semaphore = kwargs.pop("use_semaphore", False) + if use_semaphore: + async with self.semaphore: + return func(*args, **kwargs) + return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs)) async def new(self): return self.model_class(mapper=self, app=self.app) @@ -39,7 +41,8 @@ class BaseMapper: async def get(self, uid: str = None, **kwargs) -> BaseModel: if uid: kwargs["uid"] = uid - + if not kwargs.get("deleted_at"): + kwargs["deleted_at"] = None record = await self.run_in_executor(self.table.find_one, **kwargs) if not record: return None @@ -48,7 +51,6 @@ class BaseMapper: for key, value in record.items(): model[key] = value return model - return await self.model_class.from_record(mapper=self, record=record) async def exists(self, **kwargs): return await self.run_in_executor(self.table.exists, **kwargs) @@ -60,26 +62,39 @@ class BaseMapper: if not model.record.get("uid"): raise Exception(f"Attempt to save without uid: {model.record}.") model.updated_at.update() - return await self.run_in_executor(self.table.upsert, model.record, ["uid"]) + return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True) async def find(self, **kwargs) -> typing.AsyncGenerator: if not kwargs.get("_limit"): kwargs["_limit"] = self.default_limit + if not kwargs.get("deleted_at"): + kwargs["deleted_at"] = None for record in await self.run_in_executor(self.table.find, **kwargs): model = await self.new() for key, value in record.items(): model[key] = value yield model + async def _use_semaphore(self, sql): + sql = sql.lower().strip() + return "insert" in sql or "update" in sql or "delete" in sql + async def query(self, sql, *args): - for record in await self.run_in_executor(self.db.query, sql, *args): + for record in await self.run_in_executor(self.db.query, sql, *args, use_semaphore=await self._use_semaphore(sql)): yield dict(record) async def update(self, model): + if not model["deleted_at"] is None: + raise Exception("Can't update deleted record.") model.updated_at.update() - return await self.run_in_executor(self.table.update, model.record, ["uid"]) + return await self.run_in_executor(self.table.update, model.record, ["uid"],use_semaphore=True) + + async def upsert(self, model): + model.updated_at.update() + return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True) async def delete(self, **kwargs) -> int: if not kwargs or not isinstance(kwargs, dict): raise Exception("Can't execute delete with no filter.") + kwargs["use_semaphore"] = True return await self.run_in_executor(self.table.delete, **kwargs) diff --git a/src/snek/templates/app.html b/src/snek/templates/app.html index be50cdf..eff9c2a 100644 --- a/src/snek/templates/app.html +++ b/src/snek/templates/app.html @@ -18,6 +18,7 @@ + diff --git a/src/snek/view/rpc.py b/src/snek/view/rpc.py index e7e9468..29a4f65 100644 --- a/src/snek/view/rpc.py +++ b/src/snek/view/rpc.py @@ -202,7 +202,14 @@ class RPCView(BaseView): } ) return channels - + + async def clear_channel(self, channel_uid): + self._require_login() + user = await self.services.user.get(uid=self.user_uid) + if not user["is_admin"]: + raise Exception("Not allowed") + return await self.services.channel_message.clear(channel_uid) + async def write_container(self, channel_uid, content,timeout=3): self._require_login() channel_member = await self.services.channel_member.get( diff --git a/src/snek/view/web.py b/src/snek/view/web.py index 700aa23..6e27d24 100644 --- a/src/snek/view/web.py +++ b/src/snek/view/web.py @@ -71,6 +71,7 @@ class WebView(BaseView): await self.app.services.channel_member.save(channel_member) user = await self.services.user.get(uid=self.session.get("uid")) + messages = [ await self.app.services.channel_message.to_extended_dict(message) for message in await self.app.services.channel_message.offset( @@ -81,7 +82,7 @@ class WebView(BaseView): await self.app.services.notification.mark_as_read( self.session.get("uid"), message["uid"] ) - + print(messages) name = await channel_member.get_name() return await self.render_template( "web.html",