We did do the cool stuff.
This commit is contained in:
		
							parent
							
								
									a41da84e3f
								
							
						
					
					
						commit
						75f12c1971
					
				@ -11,11 +11,15 @@ class ChannelModel(BaseModel):
 | 
			
		||||
    is_listed = ModelField(name="is_listed", required=True, kind=bool, value=True)
 | 
			
		||||
    index = ModelField(name="index", required=True, kind=int, value=1000)
 | 
			
		||||
    last_message_on = ModelField(name="last_message_on", required=False, kind=str)
 | 
			
		||||
    history_start = ModelField(name="history_start", required=False, kind=str)
 | 
			
		||||
 | 
			
		||||
    async def get_last_message(self) -> ChannelMessageModel:
 | 
			
		||||
        history_start_filter = ""
 | 
			
		||||
        if self["history_start"]:
 | 
			
		||||
            history_start_filter = f" AND created_at > '{self['history_start']}' "
 | 
			
		||||
        try:
 | 
			
		||||
            async for model in self.app.services.channel_message.query(
 | 
			
		||||
                "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT 1",
 | 
			
		||||
                "SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1",
 | 
			
		||||
                {"channel_uid": self["uid"]},
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -88,12 +88,18 @@ class ChannelMessageService(BaseService):
 | 
			
		||||
        return await super().save(model)
 | 
			
		||||
 | 
			
		||||
    async def offset(self, channel_uid, page=0, timestamp=None, page_size=30):
 | 
			
		||||
        channel = await self.services.channel.get(uid=channel_uid)
 | 
			
		||||
        if not channel:
 | 
			
		||||
            return []
 | 
			
		||||
        history_start_filter = ""
 | 
			
		||||
        if channel["history_start"]:
 | 
			
		||||
            history_start_filter = f" AND created_at > '{channel['history_start']}'"    
 | 
			
		||||
        results = []
 | 
			
		||||
        offset = page * page_size
 | 
			
		||||
        try:
 | 
			
		||||
            if timestamp:
 | 
			
		||||
                async for model in self.query(
 | 
			
		||||
                    "SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
 | 
			
		||||
                    f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid AND created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
 | 
			
		||||
                    {
 | 
			
		||||
                        "channel_uid": channel_uid,
 | 
			
		||||
                        "page_size": page_size,
 | 
			
		||||
@ -104,7 +110,7 @@ class ChannelMessageService(BaseService):
 | 
			
		||||
                    results.append(model)
 | 
			
		||||
            elif page > 0:
 | 
			
		||||
                async for model in self.query(
 | 
			
		||||
                    "SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp ORDER BY created_at DESC LIMIT :page_size",
 | 
			
		||||
                    f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size",
 | 
			
		||||
                    {
 | 
			
		||||
                        "channel_uid": channel_uid,
 | 
			
		||||
                        "page_size": page_size,
 | 
			
		||||
@ -115,7 +121,7 @@ class ChannelMessageService(BaseService):
 | 
			
		||||
                    results.append(model)
 | 
			
		||||
            else:
 | 
			
		||||
                async for model in self.query(
 | 
			
		||||
                    "SELECT * FROM channel_message WHERE channel_uid=:channel_uid ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
 | 
			
		||||
                    f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
 | 
			
		||||
                    {
 | 
			
		||||
                        "channel_uid": channel_uid,
 | 
			
		||||
                        "page_size": page_size,
 | 
			
		||||
@ -124,7 +130,7 @@ class ChannelMessageService(BaseService):
 | 
			
		||||
                ):
 | 
			
		||||
                    results.append(model)
 | 
			
		||||
 | 
			
		||||
        except:
 | 
			
		||||
            pass
 | 
			
		||||
        except Exception as ex:
 | 
			
		||||
            print(ex)
 | 
			
		||||
        results.sort(key=lambda x: x["created_at"])
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
@ -12,6 +12,7 @@ class UserService(BaseService):
 | 
			
		||||
 | 
			
		||||
    async def search(self, query, **kwargs):
 | 
			
		||||
        query = query.strip().lower()
 | 
			
		||||
        kwarggs["deleted_at"] = None
 | 
			
		||||
        if not query:
 | 
			
		||||
            return []
 | 
			
		||||
        results = []
 | 
			
		||||
 | 
			
		||||
@ -149,7 +149,7 @@ class FileUploadGrid extends NjetComponent {
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        ws.onmessage = (event) => {
 | 
			
		||||
            cconsole.info(event.data)
 | 
			
		||||
            console.info(event.data)
 | 
			
		||||
 | 
			
		||||
            const data = JSON.parse(event.data);
 | 
			
		||||
            if (data.type === 'progress') {
 | 
			
		||||
 | 
			
		||||
@ -25,9 +25,11 @@ class BaseMapper:
 | 
			
		||||
        return asyncio.get_event_loop()
 | 
			
		||||
 | 
			
		||||
    async def run_in_executor(self, func, *args, **kwargs):
 | 
			
		||||
        async with self.semaphore:
 | 
			
		||||
            return func(*args, **kwargs)
 | 
			
		||||
            # return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs))
 | 
			
		||||
        use_semaphore = kwargs.pop("use_semaphore", False)
 | 
			
		||||
        if use_semaphore:
 | 
			
		||||
            async with self.semaphore:
 | 
			
		||||
                return func(*args, **kwargs)
 | 
			
		||||
        return await self.loop.run_in_executor(None, lambda: func(*args, **kwargs))
 | 
			
		||||
 | 
			
		||||
    async def new(self):
 | 
			
		||||
        return self.model_class(mapper=self, app=self.app)
 | 
			
		||||
@ -39,7 +41,8 @@ class BaseMapper:
 | 
			
		||||
    async def get(self, uid: str = None, **kwargs) -> BaseModel:
 | 
			
		||||
        if uid:
 | 
			
		||||
            kwargs["uid"] = uid
 | 
			
		||||
 | 
			
		||||
        if not kwargs.get("deleted_at"):
 | 
			
		||||
            kwargs["deleted_at"] = None
 | 
			
		||||
        record = await self.run_in_executor(self.table.find_one, **kwargs)
 | 
			
		||||
        if not record:
 | 
			
		||||
            return None
 | 
			
		||||
@ -48,7 +51,6 @@ class BaseMapper:
 | 
			
		||||
        for key, value in record.items():
 | 
			
		||||
            model[key] = value
 | 
			
		||||
        return model
 | 
			
		||||
        return await self.model_class.from_record(mapper=self, record=record)
 | 
			
		||||
 | 
			
		||||
    async def exists(self, **kwargs):
 | 
			
		||||
        return await self.run_in_executor(self.table.exists, **kwargs)
 | 
			
		||||
@ -60,26 +62,39 @@ class BaseMapper:
 | 
			
		||||
        if not model.record.get("uid"):
 | 
			
		||||
            raise Exception(f"Attempt to save without uid: {model.record}.")
 | 
			
		||||
        model.updated_at.update()
 | 
			
		||||
        return await self.run_in_executor(self.table.upsert, model.record, ["uid"])
 | 
			
		||||
        return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
 | 
			
		||||
 | 
			
		||||
    async def find(self, **kwargs) -> typing.AsyncGenerator:
 | 
			
		||||
        if not kwargs.get("_limit"):
 | 
			
		||||
            kwargs["_limit"] = self.default_limit
 | 
			
		||||
        if not kwargs.get("deleted_at"):
 | 
			
		||||
            kwargs["deleted_at"] = None
 | 
			
		||||
        for record in await self.run_in_executor(self.table.find, **kwargs):
 | 
			
		||||
            model = await self.new()
 | 
			
		||||
            for key, value in record.items():
 | 
			
		||||
                model[key] = value
 | 
			
		||||
            yield model
 | 
			
		||||
 | 
			
		||||
    async def _use_semaphore(self, sql):
 | 
			
		||||
        sql = sql.lower().strip()
 | 
			
		||||
        return "insert" in sql or "update" in sql or "delete" in sql
 | 
			
		||||
 | 
			
		||||
    async def query(self, sql, *args):
 | 
			
		||||
        for record in await self.run_in_executor(self.db.query, sql, *args):
 | 
			
		||||
        for record in await self.run_in_executor(self.db.query, sql, *args, use_semaphore=await self._use_semaphore(sql)):
 | 
			
		||||
            yield dict(record)
 | 
			
		||||
 | 
			
		||||
    async def update(self, model):
 | 
			
		||||
        if not model["deleted_at"] is None:
 | 
			
		||||
            raise Exception("Can't update deleted record.")
 | 
			
		||||
        model.updated_at.update()
 | 
			
		||||
        return await self.run_in_executor(self.table.update, model.record, ["uid"])
 | 
			
		||||
        return await self.run_in_executor(self.table.update, model.record, ["uid"],use_semaphore=True)
 | 
			
		||||
 | 
			
		||||
    async def upsert(self, model):
 | 
			
		||||
        model.updated_at.update()
 | 
			
		||||
        return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
 | 
			
		||||
 | 
			
		||||
    async def delete(self, **kwargs) -> int:
 | 
			
		||||
        if not kwargs or not isinstance(kwargs, dict):
 | 
			
		||||
            raise Exception("Can't execute delete with no filter.")
 | 
			
		||||
        kwargs["use_semaphore"] = True
 | 
			
		||||
        return await self.run_in_executor(self.table.delete, **kwargs)
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@
 | 
			
		||||
  <script src="/generic-form.js" type="module"></script>
 | 
			
		||||
  <script src="/html-frame.js" type="module"></script>
 | 
			
		||||
  <script src="/app.js" type="module"></script>
 | 
			
		||||
  <script src="/editor.js" type="module"></script> 
 | 
			
		||||
  <script src="/file-manager.js" type="module"></script>
 | 
			
		||||
  <script src="/user-list.js"></script>
 | 
			
		||||
  <script src="/message-list.js" type="module"></script>
 | 
			
		||||
 | 
			
		||||
@ -203,6 +203,13 @@ class RPCView(BaseView):
 | 
			
		||||
                )
 | 
			
		||||
            return channels
 | 
			
		||||
        
 | 
			
		||||
        async def clear_channel(self, channel_uid):
 | 
			
		||||
            self._require_login()
 | 
			
		||||
            user = await self.services.user.get(uid=self.user_uid)
 | 
			
		||||
            if not user["is_admin"]:
 | 
			
		||||
                raise Exception("Not allowed")
 | 
			
		||||
            return await self.services.channel_message.clear(channel_uid)
 | 
			
		||||
 | 
			
		||||
        async def write_container(self, channel_uid, content,timeout=3):
 | 
			
		||||
            self._require_login()
 | 
			
		||||
            channel_member = await self.services.channel_member.get(
 | 
			
		||||
 | 
			
		||||
@ -71,6 +71,7 @@ class WebView(BaseView):
 | 
			
		||||
        await self.app.services.channel_member.save(channel_member)
 | 
			
		||||
 | 
			
		||||
        user = await self.services.user.get(uid=self.session.get("uid"))
 | 
			
		||||
 | 
			
		||||
        messages = [
 | 
			
		||||
            await self.app.services.channel_message.to_extended_dict(message)
 | 
			
		||||
            for message in await self.app.services.channel_message.offset(
 | 
			
		||||
@ -81,7 +82,7 @@ class WebView(BaseView):
 | 
			
		||||
            await self.app.services.notification.mark_as_read(
 | 
			
		||||
                self.session.get("uid"), message["uid"]
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        print(messages)
 | 
			
		||||
        name = await channel_member.get_name()
 | 
			
		||||
        return await self.render_template(
 | 
			
		||||
            "web.html",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user