This commit is contained in:
retoor 2025-06-14 15:20:04 +02:00
parent cac691e9ff
commit c4739ff7d3
4 changed files with 136 additions and 195 deletions
src/snek

View File

@ -247,7 +247,7 @@ class Application(BaseApplication):
except:
pass
await app.services.drive.prepare_all()
await self.services.drive.prepare_all()
self.loop.create_task(self.task_runner())
def setup_router(self):
@ -470,7 +470,7 @@ class Application(BaseApplication):
stats['count'] = stats['count'] + 1
print(f"save is ignored {stats['count']} times")
return args[0]
save_original = app.services.channel_message.mapper.save
save_original = self.services.channel_message.mapper.save
self.services.channel_message.mapper.save = patched_save
raised_exception = None
try:

View File

@ -36,9 +36,17 @@ class ChatService(BaseService):
channel = await self.services.channel.get(uid=channel_uid)
if not channel:
raise Exception("Channel not found.")
channel_message = await self.services.channel_message.create(
channel_uid, user_uid, message, is_final
channel_message = await self.services.channel_message.get(
channel_uid=channel_uid,user_uid=user_uid, is_final=False
)
if channel_message:
channel_message["message"] = message
channel_message["is_final"] = is_final
await self.services.channel_message.save(channel_message)
else:
channel_message = await self.services.channel_message.create(
channel_uid, user_uid, message, is_final
)
channel_message_uid = channel_message["uid"]
user = await self.services.user.get(uid=user_uid)

View File

@ -1,22 +1,23 @@
import { app } from "./app.js";
import { NjetComponent } from "./njet.js";
import { FileUploadGrid } from "./file-upload-grid.js";
class ChatInputComponent extends NjetComponent {
autoCompletions = {
"example 1": () => {
},
"example 2": () => {
},
}
"example 1": () => {},
"example 2": () => {},
};
hiddenCompletions = {
"/starsRender": () => {
app.rpc.starsRender(this.channelUid, this.value.replace("/starsRender ", ""))
}
}
users = []
textarea = null
_value = ""
lastUpdateEvent = null
};
users = [];
textarea = null;
_value = "";
lastUpdateEvent = null;
expiryTimer = null;
queuedMessage = null;
lastMessagePromise = null;
@ -38,7 +39,7 @@ class ChatInputComponent extends NjetComponent {
}
get allAutoCompletions() {
return Object.assign({}, this.autoCompletions, this.hiddenCompletions)
return Object.assign({}, this.autoCompletions, this.hiddenCompletions);
}
resolveAutoComplete(input) {
@ -65,7 +66,7 @@ class ChatInputComponent extends NjetComponent {
}
getAuthors() {
return this.users.flatMap((user) => [user.username, user.nick])
return this.users.flatMap((user) => [user.username, user.nick]);
}
extractMentions(text) {
@ -82,17 +83,14 @@ class ChatInputComponent extends NjetComponent {
const lowerAuthor = author.toLowerCase();
let distance = this.levenshteinDistance(lowerMention, lowerAuthor);
if (!this.isSubsequence(lowerMention, lowerAuthor)) {
distance += 10
distance += 10;
}
if (distance < minDistance) {
minDistance = distance;
closestAuthor = author;
}
});
return { mention, closestAuthor, distance: minDistance };
@ -128,7 +126,6 @@ class ChatInputComponent extends NjetComponent {
return matrix[b.length][a.length];
}
replaceMentionsWithAuthors(text) {
const authors = this.getAuthors();
const mentions = this.extractMentions(text);
@ -143,21 +140,19 @@ class ChatInputComponent extends NjetComponent {
return updatedText;
}
async connectedCallback() {
this.user = null
this.user = null;
app.rpc.getUser(null).then((user) => {
this.user = user
})
this.user = user;
});
this.liveType = this.getAttribute("live-type") === "true";
this.liveTypeInterval =
parseInt(this.getAttribute("live-type-interval")) || 6;
this.liveTypeInterval = parseInt(this.getAttribute("live-type-interval")) || 6;
this.channelUid = this.getAttribute("channel");
app.rpc.getRecentUsers(this.channelUid).then(users => {
this.users = users
})
this.users = users;
});
this.messageUid = null;
this.classList.add("chat-input");
@ -181,24 +176,22 @@ class ChatInputComponent extends NjetComponent {
this.dispatchEvent(new CustomEvent("uploaded", e));
});
this.uploadButton.addEventListener("click", (e) => {
e.preventDefault();
this.fileUploadGrid.openFileDialog()
})
e.preventDefault();
this.fileUploadGrid.openFileDialog();
});
this.subscribe("file-uploading", (e) => {
this.fileUploadGrid.style.display = "block";
this.uploadButton.style.display = "none";
this.textarea.style.display = "none";
})
this.fileUploadGrid.style.display = "block";
this.uploadButton.style.display = "none";
this.textarea.style.display = "none";
});
this.appendChild(this.uploadButton);
this.textarea.addEventListener("blur", () => {
this.updateFromInput("");
});
this.textarea.addEventListener("keyup", (e) => {
if (e.key === "Enter" && !e.shiftKey) {
const message = this.replaceMentionsWithAuthors(this.value);
e.target.value = "";
@ -210,12 +203,10 @@ class ChatInputComponent extends NjetComponent {
autoCompletionHandler();
this.value = "";
e.target.value = "";
return;
}
this.finalizeMessage(this.messageUid)
this.finalizeMessage(this.messageUid);
return;
}
@ -254,9 +245,10 @@ class ChatInputComponent extends NjetComponent {
}, '');
app.rpc.sendMessage(this.channelUid, message, true);
});
setTimeout(() => {
this.focus();
}, 1000)
}, 1000);
}
trackSecondsBetweenEvents(event1Time, event2Time) {
@ -278,33 +270,16 @@ class ChatInputComponent extends NjetComponent {
flagTyping() {
if (this.trackSecondsBetweenEvents(this.lastUpdateEvent, new Date()) >= 1) {
this.lastUpdateEvent = new Date();
app.rpc.set_typing(this.channelUid, this.user.color).catch(() => {
});
app.rpc.set_typing(this.channelUid, this.user?.color).catch(() => {});
}
}
finalizeMessage(messageUid) {
if (!messageUid) {
if (this.value.trim() === "") {
return;
}
this.sendMessage(this.channelUid, this.replaceMentionsWithAuthors(this.value), !this.liveType);
} else if (messageUid.startsWith("?")) {
const lastQueuedMessage = this.queuedMessage;
this.lastMessagePromise?.then((uid) => {
const updatePromise = lastQueuedMessage ? app.rpc.updateMessageText(uid, lastQueuedMessage) : Promise.resolve();
return updatePromise.finally(() => {
return app.rpc.finalizeMessage(uid);
})
})
} else {
app.rpc.finalizeMessage(messageUid)
}
async finalizeMessage(messageUid) {
await app.rpc.sendMessage(this.channelUid, this.replaceMentionsWithAuthors(this.value), true);
this.value = "";
this.messageUid = null;
this.queuedMessage = null;
this.lastMessagePromise = null
this.lastMessagePromise = null;
}
updateFromInput(value) {
@ -315,39 +290,12 @@ class ChatInputComponent extends NjetComponent {
this.value = value;
this.flagTyping()
this.flagTyping();
if (this.liveType && value[0] !== "/") {
this.expiryTimer = setTimeout(() => {
this.finalizeMessage(this.messageUid)
}, this.liveTypeInterval * 1000);
const messageText = this.replaceMentionsWithAuthors(value);
if (this.messageUid?.startsWith("?")) {
this.queuedMessage = messageText;
} else if (this.messageUid) {
app.rpc.updateMessageText(this.messageUid, messageText).then((d) => {
if (!d.success) {
this.messageUid = null
this.updateFromInput(value)
}
})
} else {
const placeHolderId = "?" + crypto.randomUUID();
this.messageUid = placeHolderId;
this.lastMessagePromise = this.sendMessage(this.channelUid, messageText, !this.liveType).then(async (uid) => {
if (this.liveType && this.messageUid === placeHolderId) {
if (this.queuedMessage && this.queuedMessage !== messageText) {
await app.rpc.updateMessageText(uid, this.queuedMessage)
}
this.messageUid = uid;
}
return uid
});
}
this.messageUid = this.sendMessage(this.channelUid, messageText, !this.liveType);
return this.messageUid;
}
}
@ -360,3 +308,4 @@ class ChatInputComponent extends NjetComponent {
}
customElements.define("chat-input", ChatInputComponent);

View File

@ -6,18 +6,19 @@
# MIT License: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions.
import asyncio
import json
import logging
import traceback
from datetime import datetime
from aiohttp import web
from snek.system.model import now
from snek.system.profiler import Profiler
from snek.system.view import BaseView
import time
import time
logger = logging.getLogger(__name__)
@ -29,18 +30,19 @@ class RPCView(BaseView):
self.services = self.app.services
self.ws = ws
self.user_session = {}
self._finalize_task = None
self._scheduled = []
async def _session_ensure(self):
uid = await self.view.session_get("uid")
uid = self.view.session.get("uid")
if uid not in self.user_session:
self.user_session[uid] = {
"said_hello": False,
}
async def session_get(self, key, default):
async def session_get(self, key, default=None):
await self._session_ensure()
return self.user_session[self.user_uid].get(key, default)
return self.user_session.get(self.user_uid, {}).get(key, default)
async def session_set(self, key, value):
await self._session_ensure()
@ -49,7 +51,6 @@ class RPCView(BaseView):
async def db_insert(self, table_name, record):
self._require_login()
return await self.services.db.insert(self.user_uid, table_name, record)
async def db_update(self, table_name, record):
@ -64,7 +65,7 @@ class RPCView(BaseView):
return await self.services.socket.broadcast(
channel_uid,
{
"channel_uid": "293ecf12-08c9-494b-b423-48ba1a2d12c2",
"channel_uid": channel_uid,
"event": "set_typing",
"data": {
"event": "set_typing",
@ -95,7 +96,7 @@ class RPCView(BaseView):
self.user_uid, table_name, record, keys
)
async def db_query(self, table_name, args):
async def db_query(self, table_name, sql, args):
self._require_login()
return await self.services.db.query(self.user_uid, table_name, sql, args)
@ -129,21 +130,23 @@ class RPCView(BaseView):
self.view.session["logged_in"] = True
self.view.session["username"] = user["username"]
self.view.session["user_nick"] = user["nick"]
record = user.record
del record["password"]
del record["deleted_at"]
record = dict(user.record)
if "password" in record:
del record["password"]
if "deleted_at" in record:
del record["deleted_at"]
await self.services.socket.add(
self.ws, self.view.request.session.get("uid")
self.ws, self.view.session.get("uid")
)
async for subscription in self.services.channel_member.find(
user_uid=self.view.request.session.get("uid"),
user_uid=self.view.session.get("uid"),
deleted_at=None,
is_banned=False,
):
await self.services.socket.subscribe(
self.ws,
subscription["channel_uid"],
self.view.request.session.get("uid"),
self.view.session.get("uid"),
)
return record
@ -151,22 +154,25 @@ class RPCView(BaseView):
self._require_login()
return [user["username"] for user in await self.services.user.search(query)]
async def get_user(self, user_uid):
async def get_user(self, user_uid=None):
self._require_login()
if not user_uid:
user_uid = self.user_uid
user = await self.services.user.get(uid=user_uid)
record = user.record
del record["password"]
del record["deleted_at"]
record = dict(user.record)
if "password" in record:
del record["password"]
if "deleted_at" in record:
del record["deleted_at"]
if user_uid != user["uid"]:
del record["email"]
if "email" in record:
del record["email"]
return record
async def get_messages(self, channel_uid, offset=0, timestamp=None):
self._require_login()
messages = []
for message in await self.services.channel_message.offset(
async for message in self.services.channel_message.offset(
channel_uid, offset or 0, timestamp or None
):
extended_dict = await self.services.channel_message.to_extended_dict(
@ -197,54 +203,56 @@ class RPCView(BaseView):
"new_count": subscription["new_count"],
"is_moderator": subscription["is_moderator"],
"is_read_only": subscription["is_read_only"],
"new_count": subscription["new_count"],
"color": color,
}
)
return channels
async def clear_channel(self, channel_uid):
self._require_login()
user = await self.services.user.get(uid=self.user_uid)
if not user["is_admin"]:
raise Exception("Not allowed")
channel = await self.services.channel.get(uid=channel_uid)
if not channel:
raise Exception("Channel not found")
channel['history_start'] = datetime.now()
await self.services.channel.save(channel)
return await self.services.channel_message.clear(channel_uid)
async def write_container(self, channel_uid, content,timeout=3):
async def write_container(self, channel_uid, content, timeout=3):
self._require_login()
channel_member = await self.services.channel_member.get(
channel_uid=channel_uid, user_uid=self.user_uid
)
if not channel_member:
raise Exception("Not allowed")
container_name = await self.services.container.get_container_name(channel_uid)
class SessionCall:
def __init__(self, app,channel_uid_uid, container_name):
self.app = app
self.channel_uid = channel_uid
self.container_name = container_name
def __init__(self, app, channel_uid, container_name):
self.app = app
self.channel_uid = channel_uid
self.container_name = container_name
self.time_last_output = time.time()
self.output = b''
async def stdout_event_handler(self, data):
self.time_last_output = time.time()
self.output += data
return True
return True
async def communicate(self,content, timeout=3):
async def communicate(self, content, timeout=3):
await self.app.services.container.add_event_listener(self.container_name, "stdout", self.stdout_event_handler)
await self.app.services.container.write_stdin(self.channel_uid, content)
while time.time() - self.time_last_output < timeout:
await asyncio.sleep(0.1)
await self.app.services.container.remove_event_listener(self.container_name, "stdout", self.stdout_event_handler)
return self.output
sc = SessionCall(self, channel_uid,container_name)
return (await sc.communicate(content)).decode("utf-8","ignore")
sc = SessionCall(self, channel_uid, container_name)
return (await sc.communicate(content)).decode("utf-8", "ignore")
async def get_container(self, channel_uid):
self._require_login()
@ -307,17 +315,32 @@ class RPCView(BaseView):
return True
async def _finalize_message_task(self, message_uid, timeout=5):
if self._finalize_task:
self._finalize_task.cancel()
await asyncio.sleep(timeout)
await self.services.chat.finalize(message_uid)
async def update_message_text(self, message_uid, text):
self._require_login()
message = await self.services.channel_message.get(message_uid, user_uid=None)
if message and message['message'] == text:
return message_uid
if not message or message["is_final"]:
return await self.create_message(text)
async with self.app.no_save():
self._require_login()
message = await self.services.channel_message.get(message_uid)
if message["user_uid"] != self.user_uid:
raise Exception("Not allowed")
if message.get_seconds_since_last_update() > 5:
if hasattr(message, "get_seconds_since_last_update"):
seconds_since_last_update = message.get_seconds_since_last_update()
else:
seconds_since_last_update = 0
if seconds_since_last_update > 5:
return {
"error": "Message too old",
"seconds_since_last_update": message.get_seconds_since_last_update(),
"seconds_since_last_update": seconds_since_last_update,
"success": False,
}
@ -328,7 +351,7 @@ class RPCView(BaseView):
message["deleted_at"] = None
await self.services.channel_message.save(message)
data = message.record
data = dict(message.record)
data["text"] = message["message"]
data["message_uid"] = message_uid
@ -337,31 +360,19 @@ class RPCView(BaseView):
{
"channel_uid": message["channel_uid"],
"event": "update_message_text",
"data": message.record,
"data": data,
},
)
return {"success": True}
if self._finalize_task:
self._finalize_task.cancel()
self._finalize_task = asyncio.create_task(self._finalize_message_task(message_uid, timeout=5))
async def clear_channel(self, channel_uid):
self._require_login()
user = await self.services.user.get(uid=self.user_uid)
if not user["is_admin"]:
raise Exception("Not allowed")
channel = await self.services.channel.get(uid=channel_uid)
if not channel:
raise Exception("Channel not found")
channel['history_start'] = datetime.now()
await self.services.channel.save(channel)
return await self.services.channel_message.clear(channel_uid)
return {"success": True}
async def send_message(self, channel_uid, message, is_final=True):
self._require_login()
message = await self.services.chat.send(
self.user_uid, channel_uid, message, is_final
)
return message["uid"]
await self.services.chat.send(self.user_uid, channel_uid, message, is_final)
async def echo(self, *args):
self._require_login()
@ -375,13 +386,7 @@ class RPCView(BaseView):
any(
keyword in lowercase
for keyword in [
"drop",
"alter",
"update",
"delete",
"replace",
"insert",
"truncate",
"drop", "alter", "update", "delete", "replace", "insert", "truncate"
]
)
and "select" not in lowercase
@ -391,25 +396,13 @@ class RPCView(BaseView):
dict(record) async for record in self.services.channel.query(args[0])
]
for record in records:
try:
del record["email"]
except KeyError:
pass
try:
del record["password"]
except KeyError:
pass
try:
del record["message"]
except:
pass
try:
del record["html"]
except:
pass
return [
dict(record) async for record in self.services.channel.query(args[0])
]
for field in ("email", "password", "message", "html"):
if field in record:
try:
del record[field]
except Exception:
pass
return records
async def __call__(self, data):
try:
@ -437,10 +430,9 @@ class RPCView(BaseView):
{"callId": call_id, "success": success, "data": result}
)
except Exception as ex:
print(str(ex), flush=True)
logger.exception(ex)
await self._send_json(
{"callId": call_id, "success": False, "data": str(ex)}
{"callId": data.get("callId"), "success": False, "data": str(ex)}
)
async def _send_json(self, obj):
@ -449,10 +441,9 @@ class RPCView(BaseView):
except Exception as ex:
await self.services.socket.delete(self.ws)
await self.ws.close()
async def get_online_users(self, channel_uid):
self._require_login()
results = [
record
async for record in self.services.channel.get_recent_users(channel_uid)
@ -460,13 +451,8 @@ class RPCView(BaseView):
results = sorted(results, key=lambda x: x["nick"])
return results
async def echo(self, obj):
await self.ws.send_json(obj)
return "noresponse"
async def get_recent_users(self, channel_uid):
self._require_login()
return [
{
"uid": record["uid"],
@ -479,7 +465,6 @@ class RPCView(BaseView):
async def get_users(self, channel_uid):
self._require_login()
return [
{
"uid": record["uid"],
@ -504,7 +489,6 @@ class RPCView(BaseView):
return {"pong": args}
async def stars_render(self, channel_uid, message):
for user in await self.get_online_users(channel_uid):
try:
await self.services.socket.send_to_user(
@ -515,7 +499,7 @@ class RPCView(BaseView):
},
)
except Exception as ex:
print(ex)
logger.exception(ex)
async def get(self):
scheduled = []
@ -538,7 +522,7 @@ class RPCView(BaseView):
await self.services.socket.subscribe(
ws, subscription["channel_uid"], self.request.session.get("uid")
)
if not scheduled and self.request.app.uptime_seconds < 5:
if not scheduled and getattr(self.request.app, "uptime_seconds", 0) < 5:
await schedule(
self.request.session.get("uid"),
0,
@ -547,7 +531,7 @@ class RPCView(BaseView):
await schedule(
self.request.session.get("uid"),
15,
{"event": "deployed", "data": {"uptime": self.request.app.uptime}},
{"event": "deployed", "data": {"uptime": getattr(self.request.app, "uptime", 0)}},
)
rpc = RPCView.RPCApi(self, ws)
@ -556,7 +540,6 @@ class RPCView(BaseView):
try:
await rpc(msg.json())
except Exception as ex:
print("Deleting socket", ex, flush=True)
logger.exception(ex)
await self.services.socket.delete(ws)
break
@ -565,3 +548,4 @@ class RPCView(BaseView):
elif msg.type == web.WSMsgType.CLOSE:
pass
return ws