Compare commits

...

9 Commits

Author SHA1 Message Date
fadc57a7c7 Update. 2025-09-09 23:40:45 +02:00
cca3946a35 Update channel message. 2025-09-08 06:08:12 +02:00
18be3fdc19 Executor pools. 2025-09-08 01:09:22 +02:00
939e63f244 Executor pools. 2025-09-08 00:59:11 +02:00
b4c267d584 Update. 2025-09-07 02:42:47 +02:00
b9b31a494a Update. 2025-08-31 03:41:41 +02:00
b961954aa1 Update. 2025-08-31 03:27:47 +02:00
84287808c8 Update. 2025-08-31 03:25:26 +02:00
692272e3ca Fixes. 2025-08-31 03:22:00 +02:00
6 changed files with 85 additions and 26 deletions

View File

@ -6,6 +6,7 @@ import uuid
import signal
from datetime import datetime
from contextlib import asynccontextmanager
import aiohttp_debugtoolbar
from snek import snode
from snek.view.threads import ThreadsView
@ -176,6 +177,8 @@ class Application(BaseApplication):
self.on_startup.append(self.start_ssh_server)
self.on_startup.append(self.prepare_database)
async def prepare_stats(self, app):
app['stats'] = create_stats_structure()
print("Stats prepared", flush=True)
@ -287,9 +290,9 @@ class Application(BaseApplication):
self.router.add_view("/login.json", LoginView)
self.router.add_view("/register.html", RegisterView)
self.router.add_view("/register.json", RegisterView)
self.router.add_view("/drive/{rel_path:.*}", DriveView)
self.router.add_view("/drive.bin", UploadView)
self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
# self.router.add_view("/drive/{rel_path:.*}", DriveView)
## self.router.add_view("/drive.bin", UploadView)
# self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
self.router.add_view("/search-user.html", SearchUserView)
self.router.add_view("/search-user.json", SearchUserView)
self.router.add_view("/avatar/{uid}.svg", AvatarView)
@ -297,25 +300,25 @@ class Application(BaseApplication):
self.router.add_get("/http-photo", self.handle_http_photo)
self.router.add_get("/rpc.ws", RPCView)
self.router.add_get("/c/{channel:.*}", ChannelView)
self.router.add_view(
"/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
)
self.router.add_view(
"/channel/{channel_uid}/drive.json", ChannelDriveApiView
)
#self.router.add_view(
# "/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
#)
#self.router.add_view(
# "/channel/{channel_uid}/drive.json", ChannelDriveApiView
#)
self.router.add_view(
"/channel/{channel_uid}/attachment.sock", ChannelAttachmentUploadView
)
self.router.add_view(
"/channel/attachment/{relative_url:.*}", ChannelAttachmentView
)
)#
self.router.add_view("/channel/{channel}.html", WebView)
self.router.add_view("/threads.html", ThreadsView)
self.router.add_view("/terminal.ws", TerminalSocketView)
self.router.add_view("/terminal.html", TerminalView)
self.router.add_view("/drive.json", DriveApiView)
self.router.add_view("/drive.html", DriveView)
self.router.add_view("/drive/{drive}.json", DriveView)
#self.router.add_view("/drive.json", DriveApiView)
#self.router.add_view("/drive.html", DriveView)
#self.router.add_view("/drive/{drive}.json", DriveView)
self.router.add_get("/stats.html", stats_handler)
self.router.add_view("/stats.json", StatsView)
self.router.add_view("/user/{user}.html", UserView)
@ -496,6 +499,7 @@ class Application(BaseApplication):
raise raised_exception
app = Application(db_path="sqlite:///snek.db")
#aiohttp_debugtoolbar.setup(app)
async def main():

View File

@ -23,7 +23,7 @@ class ChannelModel(BaseModel):
history_start_filter = f" AND created_at > '{self['history_start']}' "
try:
async for model in self.app.services.channel_message.query(
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1",
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY id DESC LIMIT 1",
{"channel_uid": self["uid"]},
):

View File

@ -1,6 +1,21 @@
from snek.system.service import BaseService
from snek.system.template import sanitize_html
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
import json
from jinja2 import Environment, FileSystemLoader
global jinja2_env
import pathlib
template_path = pathlib.Path(__file__).parent.parent.joinpath("templates")
def render(context):
template =jinja2_env.get_template("message.html")
return sanitize_html(template.render(**context))
class ChannelMessageService(BaseService):
mapper_name = "channel_message"
@ -8,6 +23,19 @@ class ChannelMessageService(BaseService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._configured_indexes = False
self._executor_pools = {}
global jinja2_env
jinja2_env = self.app.jinja2_env
self._max_workers = 1
def get_or_create_executor(self, uid):
if not uid in self._executor_pools:
self._executor_pools[uid] = ProcessPoolExecutor(max_workers=5)
return self._executor_pools[uid]
def delete_executor(self, uid):
if uid in self._executor_pools:
self._executor_pools[uid].shutdown()
del self._executor_pools[uid]
async def maintenance(self):
args = {}
@ -69,10 +97,14 @@ class ChannelMessageService(BaseService):
"color": user["color"],
}
)
loop = asyncio.get_event_loop()
try:
template = self.app.jinja2_env.get_template("message.html")
model["html"] = template.render(**context)
model['html'] = sanitize_html(model['html'])
context = json.loads(json.dumps(context, default=str))
model["html"] = await loop.run_in_executor(self.get_or_create_executor(model["uid"]), render,context)
#model['html'] = await loop.run_in_executor(self.get_or_create_executor(user["uid"]), sanitize_html,model['html'])
except Exception as ex:
print(ex, flush=True)
@ -91,6 +123,8 @@ class ChannelMessageService(BaseService):
["deleted_at"], unique=False
)
self._configured_indexes = True
if model['is_final']:
self.delete_executor(model['uid'])
return model
raise Exception(f"Failed to create channel message: {model.errors}.")
@ -127,10 +161,15 @@ class ChannelMessageService(BaseService):
"color": user["color"],
}
)
template = self.app.jinja2_env.get_template("message.html")
model["html"] = template.render(**context)
model['html'] = sanitize_html(model['html'])
return await super().save(model)
context = json.loads(json.dumps(context, default=str))
loop = asyncio.get_event_loop()
model["html"] = await loop.run_in_executor(self.get_or_create_executor(model["uid"]), render, context)
#model['html'] = await loop.run_in_executor(self.get_or_create_executor(user["uid"]), sanitize_html,model['html'])
result = await super().save(model)
if model['is_final']:
self.delete_executor(model['uid'])
return result
async def offset(self, channel_uid, page=0, timestamp=None, page_size=30):
channel = await self.services.channel.get(uid=channel_uid)

View File

@ -44,19 +44,29 @@ class SocketService(BaseService):
async def user_availability_service(self):
logger.info("User availability update service started.")
logger.debug("Entering the main loop.")
while True:
logger.info("Updating user availability...")
logger.debug("Initializing users_updated list.")
users_updated = []
logger.debug("Iterating over sockets.")
for s in self.sockets:
logger.debug(f"Checking connection status for socket: {s}.")
if not s.is_connected:
logger.debug("Socket is not connected, continuing to next socket.")
continue
logger.debug(f"Checking if user {s.user} is already updated.")
if s.user not in users_updated:
logger.debug(f"Updating last_ping for user: {s.user}.")
s.user["last_ping"] = now()
logger.debug(f"Saving user {s.user} to the database.")
await self.app.services.user.save(s.user)
logger.debug(f"Adding user {s.user} to users_updated list.")
users_updated.append(s.user)
logger.info(
f"Updated user availability for {len(users_updated)} online users."
)
logger.debug("Sleeping for 60 seconds before the next update.")
await asyncio.sleep(60)
async def add(self, ws, user_uid):

View File

@ -256,7 +256,9 @@ class MessageList extends HTMLElement {
this.querySelectorAll('.avatar').forEach((el) => {
const anchor = el.closest('a');
if (anchor && typeof anchor.href === 'string' && anchor.href.includes(uid)) {
if(!lastElement)
lastElement = el;
}
});
if (lastElement) {
@ -278,11 +280,15 @@ class MessageList extends HTMLElement {
upsertMessage(data) {
let message = this.messageMap.get(data.uid);
if (message && (data.is_final || !data.message)) {
message.parentElement?.removeChild(message);
//message.parentElement?.removeChild(message);
// TO force insert
message = null;
//message = null;
}
if(message && !data.message){
message.parentElement?.removeChild(message);
message = null;
}
if (!data.message) return;
const wrapper = document.createElement("div");

View File

@ -529,8 +529,8 @@ class RPCView(BaseView):
try:
await self.ws.send_str(json.dumps(obj, default=str))
except Exception as ex:
print("THIS IS THE DeAL>",str(ex), flush=True)
await self.services.socket.delete(self.ws)
await self.ws.close()
async def get_online_users(self, channel_uid):
self._require_login()
@ -638,7 +638,7 @@ class RPCView(BaseView):
try:
await rpc(msg.json())
except Exception as ex:
print("Deleting socket", ex, flush=True)
print("XXXXXXXXXX Deleting socket", ex, flush=True)
logger.exception(ex)
await self.services.socket.delete(ws)
break