diff --git a/src/snek/model/channel.py b/src/snek/model/channel.py
index 0a90c39..6e32335 100644
--- a/src/snek/model/channel.py
+++ b/src/snek/model/channel.py
@@ -11,11 +11,15 @@ class ChannelModel(BaseModel):
is_listed = ModelField(name="is_listed", required=True, kind=bool, value=True)
index = ModelField(name="index", required=True, kind=int, value=1000)
last_message_on = ModelField(name="last_message_on", required=False, kind=str)
+ history_start = ModelField(name="history_start", required=False, kind=str)
async def get_last_message(self) -> ChannelMessageModel:
+ history_start_filter = ""
+ if self["history_start"]:
+ history_start_filter = f" AND created_at > '{self['history_start']}' "
try:
async for model in self.app.services.channel_message.query(
- "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT 1",
+ "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1",
{"channel_uid": self["uid"]},
):
diff --git a/src/snek/service/channel_message.py b/src/snek/service/channel_message.py
index bfc5954..c27571b 100644
--- a/src/snek/service/channel_message.py
+++ b/src/snek/service/channel_message.py
@@ -88,12 +88,18 @@ class ChannelMessageService(BaseService):
return await super().save(model)
async def offset(self, channel_uid, page=0, timestamp=None, page_size=30):
+ channel = await self.services.channel.get(uid=channel_uid)
+ if not channel:
+ return []
+ history_start_filter = ""
+ if channel["history_start"]:
+ history_start_filter = f" AND created_at > '{channel['history_start']}'"
results = []
offset = page * page_size
try:
if timestamp:
async for model in self.query(
- "SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
+ f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
{
"channel_uid": channel_uid,
"page_size": page_size,
@@ -104,7 +110,7 @@ class ChannelMessageService(BaseService):
results.append(model)
elif page > 0:
async for model in self.query(
- "SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size",
+ f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size",
{
"channel_uid": channel_uid,
"page_size": page_size,
@@ -115,7 +121,7 @@ class ChannelMessageService(BaseService):
results.append(model)
else:
async for model in self.query(
- "SELECT * FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
+ f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
{
"channel_uid": channel_uid,
"page_size": page_size,
@@ -124,7 +130,7 @@ class ChannelMessageService(BaseService):
):
results.append(model)
- except:
- pass
+ except Exception as ex:
+ print(ex)
results.sort(key=lambda x: x["created_at"])
return results
diff --git a/src/snek/service/user.py b/src/snek/service/user.py
index 34dc468..09b2a09 100644
--- a/src/snek/service/user.py
+++ b/src/snek/service/user.py
@@ -12,6 +12,7 @@ class UserService(BaseService):
async def search(self, query, **kwargs):
query = query.strip().lower()
+ kwarggs["deleted_at"] = None
if not query:
return []
results = []
diff --git a/src/snek/static/file-upload-grid.js b/src/snek/static/file-upload-grid.js
index 0a623f6..32669a7 100644
--- a/src/snek/static/file-upload-grid.js
+++ b/src/snek/static/file-upload-grid.js
@@ -149,7 +149,7 @@ class FileUploadGrid extends NjetComponent {
};
ws.onmessage = (event) => {
- cconsole.info(event.data)
+ console.info(event.data)
const data = JSON.parse(event.data);
if (data.type === 'progress') {
diff --git a/src/snek/system/mapper.py b/src/snek/system/mapper.py
index fef7784..aa4af18 100644
--- a/src/snek/system/mapper.py
+++ b/src/snek/system/mapper.py
@@ -25,9 +25,11 @@ class BaseMapper:
return asyncio.get_event_loop()
async def run_in_executor(self, func, *args, **kwargs):
- async with self.semaphore:
- return func(*args, **kwargs)
- # return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs))
+ use_semaphore = kwargs.pop("use_semaphore", False)
+ if use_semaphore:
+ async with self.semaphore:
+ return func(*args, **kwargs)
+ return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs))
async def new(self):
return self.model_class(mapper=self, app=self.app)
@@ -39,7 +41,8 @@ class BaseMapper:
async def get(self, uid: str = None, **kwargs) -> BaseModel:
if uid:
kwargs["uid"] = uid
-
+ if not kwargs.get("deleted_at"):
+ kwargs["deleted_at"] = None
record = await self.run_in_executor(self.table.find_one, **kwargs)
if not record:
return None
@@ -48,7 +51,6 @@ class BaseMapper:
for key, value in record.items():
model[key] = value
return model
- return await self.model_class.from_record(mapper=self, record=record)
async def exists(self, **kwargs):
return await self.run_in_executor(self.table.exists, **kwargs)
@@ -60,26 +62,39 @@ class BaseMapper:
if not model.record.get("uid"):
raise Exception(f"Attempt to save without uid: {model.record}.")
model.updated_at.update()
- return await self.run_in_executor(self.table.upsert, model.record, ["uid"])
+ return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
async def find(self, **kwargs) -> typing.AsyncGenerator:
if not kwargs.get("_limit"):
kwargs["_limit"] = self.default_limit
+ if not kwargs.get("deleted_at"):
+ kwargs["deleted_at"] = None
for record in await self.run_in_executor(self.table.find, **kwargs):
model = await self.new()
for key, value in record.items():
model[key] = value
yield model
+ async def _use_semaphore(self, sql):
+ sql = sql.lower().strip()
+ return "insert" in sql or "update" in sql or "delete" in sql
+
async def query(self, sql, *args):
- for record in await self.run_in_executor(self.db.query, sql, *args):
+ for record in await self.run_in_executor(self.db.query, sql, *args, use_semaphore=await self._use_semaphore(sql)):
yield dict(record)
async def update(self, model):
+ if not model["deleted_at"] is None:
+ raise Exception("Can't update deleted record.")
model.updated_at.update()
- return await self.run_in_executor(self.table.update, model.record, ["uid"])
+ return await self.run_in_executor(self.table.update, model.record, ["uid"],use_semaphore=True)
+
+ async def upsert(self, model):
+ model.updated_at.update()
+ return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
async def delete(self, **kwargs) -> int:
if not kwargs or not isinstance(kwargs, dict):
raise Exception("Can't execute delete with no filter.")
+ kwargs["use_semaphore"] = True
return await self.run_in_executor(self.table.delete, **kwargs)
diff --git a/src/snek/templates/app.html b/src/snek/templates/app.html
index be50cdf..eff9c2a 100644
--- a/src/snek/templates/app.html
+++ b/src/snek/templates/app.html
@@ -18,6 +18,7 @@
+
diff --git a/src/snek/view/rpc.py b/src/snek/view/rpc.py
index e7e9468..29a4f65 100644
--- a/src/snek/view/rpc.py
+++ b/src/snek/view/rpc.py
@@ -202,7 +202,14 @@ class RPCView(BaseView):
}
)
return channels
-
+
+ async def clear_channel(self, channel_uid):
+ self._require_login()
+ user = await self.services.user.get(uid=self.user_uid)
+ if not user["is_admin"]:
+ raise Exception("Not allowed")
+ return await self.services.channel_message.clear(channel_uid)
+
async def write_container(self, channel_uid, content,timeout=3):
self._require_login()
channel_member = await self.services.channel_member.get(
diff --git a/src/snek/view/web.py b/src/snek/view/web.py
index 700aa23..6e27d24 100644
--- a/src/snek/view/web.py
+++ b/src/snek/view/web.py
@@ -71,6 +71,7 @@ class WebView(BaseView):
await self.app.services.channel_member.save(channel_member)
user = await self.services.user.get(uid=self.session.get("uid"))
+
messages = [
await self.app.services.channel_message.to_extended_dict(message)
for message in await self.app.services.channel_message.offset(
@@ -81,7 +82,7 @@ class WebView(BaseView):
await self.app.services.notification.mark_as_read(
self.session.get("uid"), message["uid"]
)
-
+ print(messages)
name = await channel_member.get_name()
return await self.render_template(
"web.html",