Compare commits

..

3 Commits

Author SHA1 Message Date
BordedDev
51b4898078 Unformatted code 2025-08-08 18:45:42 +02:00
BordedDev
40f9646251 Fixed db related issues 2025-08-08 18:29:52 +02:00
5e99e894e9 Not working version, issues with get 2025-08-08 17:28:35 +02:00
15 changed files with 1296 additions and 136 deletions

View File

@ -40,7 +40,8 @@ dependencies = [
"pillow-heif",
"IP2Location",
"bleach",
"sentry-sdk"
"sentry-sdk",
"aiosqlite"
]
[tool.setuptools.packages.find]

View File

@ -6,11 +6,10 @@ import uuid
import signal
from datetime import datetime
from contextlib import asynccontextmanager
import aiohttp_debugtoolbar
from snek import snode
from snek.view.threads import ThreadsView
from snek.system.ads import AsyncDataSet
logging.basicConfig(level=logging.DEBUG)
from concurrent.futures import ThreadPoolExecutor
from ipaddress import ip_address
@ -108,7 +107,7 @@ async def ip2location_middleware(request, handler):
user["city"]
if user["city"] != location.city:
user["country_long"] = location.country
user["country_short"] = locaion.country_short
user["country_short"] = location.country_short
user["city"] = location.city
user["region"] = location.region
user["latitude"] = location.latitude
@ -143,6 +142,7 @@ class Application(BaseApplication):
client_max_size=1024 * 1024 * 1024 * 5 * args,
**kwargs,
)
self.db = AsyncDataSet(kwargs["db_path"].replace("sqlite:///", ""))
session_setup(self, EncryptedCookieStorage(SESSION_KEY))
self.tasks = asyncio.Queue()
self._middlewares.append(session_middleware)
@ -165,7 +165,7 @@ class Application(BaseApplication):
self.mappers = get_mappers(app=self)
self.broadcast_service = None
self.user_availability_service_task = None
self.setup_router()
base_path = pathlib.Path(__file__).parent
self.ip2location = IP2Location.IP2Location(
@ -175,9 +175,7 @@ class Application(BaseApplication):
self.on_startup.append(self.prepare_asyncio)
self.on_startup.append(self.start_user_availability_service)
self.on_startup.append(self.start_ssh_server)
self.on_startup.append(self.prepare_database)
#self.on_startup.append(self.prepare_database)
async def prepare_stats(self, app):
app['stats'] = create_stats_structure()
@ -245,21 +243,11 @@ class Application(BaseApplication):
except Exception as ex:
print(ex)
self.db.commit()
async def prepare_database(self, app):
self.db.query("PRAGMA journal_mode=WAL")
self.db.query("PRAGMA syncnorm=off")
try:
if not self.db["user"].has_index("username"):
self.db["user"].create_index("username", unique=True)
if not self.db["channel_member"].has_index(["channel_uid", "user_uid"]):
self.db["channel_member"].create_index(["channel_uid", "user_uid"])
if not self.db["channel_message"].has_index(["channel_uid", "user_uid"]):
self.db["channel_message"].create_index(["channel_uid", "user_uid"])
except:
pass
await self.db.query_raw("PRAGMA journal_mode=WAL")
await self.db.query_raw("PRAGMA syncnorm=off")
await self.services.drive.prepare_all()
self.loop.create_task(self.task_runner())
@ -290,9 +278,9 @@ class Application(BaseApplication):
self.router.add_view("/login.json", LoginView)
self.router.add_view("/register.html", RegisterView)
self.router.add_view("/register.json", RegisterView)
# self.router.add_view("/drive/{rel_path:.*}", DriveView)
## self.router.add_view("/drive.bin", UploadView)
# self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
self.router.add_view("/drive/{rel_path:.*}", DriveView)
self.router.add_view("/drive.bin", UploadView)
self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
self.router.add_view("/search-user.html", SearchUserView)
self.router.add_view("/search-user.json", SearchUserView)
self.router.add_view("/avatar/{uid}.svg", AvatarView)
@ -300,25 +288,25 @@ class Application(BaseApplication):
self.router.add_get("/http-photo", self.handle_http_photo)
self.router.add_get("/rpc.ws", RPCView)
self.router.add_get("/c/{channel:.*}", ChannelView)
#self.router.add_view(
# "/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
#)
#self.router.add_view(
# "/channel/{channel_uid}/drive.json", ChannelDriveApiView
#)
self.router.add_view(
"/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
)
self.router.add_view(
"/channel/{channel_uid}/drive.json", ChannelDriveApiView
)
self.router.add_view(
"/channel/{channel_uid}/attachment.sock", ChannelAttachmentUploadView
)
self.router.add_view(
"/channel/attachment/{relative_url:.*}", ChannelAttachmentView
)#
)
self.router.add_view("/channel/{channel}.html", WebView)
self.router.add_view("/threads.html", ThreadsView)
self.router.add_view("/terminal.ws", TerminalSocketView)
self.router.add_view("/terminal.html", TerminalView)
#self.router.add_view("/drive.json", DriveApiView)
#self.router.add_view("/drive.html", DriveView)
#self.router.add_view("/drive/{drive}.json", DriveView)
self.router.add_view("/drive.json", DriveApiView)
self.router.add_view("/drive.html", DriveView)
self.router.add_view("/drive/{drive}.json", DriveView)
self.router.add_get("/stats.html", stats_handler)
self.router.add_view("/stats.json", StatsView)
self.router.add_view("/user/{user}.html", UserView)
@ -424,11 +412,11 @@ class Application(BaseApplication):
self.jinja2_env.loader = await self.get_user_template_loader(
request.session.get("uid")
)
try:
context["nonce"] = request['csp_nonce']
except:
context['nonce'] = '?'
context['nonce'] = '?'
rendered = await super().render_template(template, request, context)
@ -463,7 +451,7 @@ class Application(BaseApplication):
async def get_user_template_loader(self, uid=None):
template_paths = []
for admin_uid in self.services.user.get_admin_uids():
for admin_uid in await self.services.user.get_admin_uids():
user_template_path = await self.services.user.get_template_path(admin_uid)
if user_template_path:
template_paths.append(user_template_path)
@ -475,7 +463,7 @@ class Application(BaseApplication):
template_paths.append(self.template_path)
return FileSystemLoader(template_paths)
@asynccontextmanager
async def no_save(self):
stats = {
@ -490,7 +478,7 @@ class Application(BaseApplication):
self.services.channel_message.mapper.save = patched_save
raised_exception = None
try:
yield
yield
except Exception as ex:
raised_exception = ex
finally:
@ -499,7 +487,6 @@ class Application(BaseApplication):
raise raised_exception
app = Application(db_path="sqlite:///snek.db")
#aiohttp_debugtoolbar.setup(app)
async def main():

View File

@ -6,11 +6,11 @@ class UserMapper(BaseMapper):
table_name = "user"
model_class = UserModel
def get_admin_uids(self):
async def get_admin_uids(self):
try:
return [
user["uid"]
for user in self.db.query(
for user in await self.db.query(
"SELECT uid FROM user WHERE is_admin = :is_admin",
{"is_admin": True},
)

View File

@ -23,7 +23,7 @@ class ChannelModel(BaseModel):
history_start_filter = f" AND created_at > '{self['history_start']}' "
try:
async for model in self.app.services.channel_message.query(
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY id DESC LIMIT 1",
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1",
{"channel_uid": self["uid"]},
):

View File

@ -1,21 +1,6 @@
from snek.system.service import BaseService
from snek.system.template import sanitize_html
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
import json
from jinja2 import Environment, FileSystemLoader
global jinja2_env
import pathlib
template_path = pathlib.Path(__file__).parent.parent.joinpath("templates")
def render(context):
template =jinja2_env.get_template("message.html")
return sanitize_html(template.render(**context))
class ChannelMessageService(BaseService):
mapper_name = "channel_message"
@ -23,19 +8,6 @@ class ChannelMessageService(BaseService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._configured_indexes = False
self._executor_pools = {}
global jinja2_env
jinja2_env = self.app.jinja2_env
self._max_workers = 1
def get_or_create_executor(self, uid):
if not uid in self._executor_pools:
self._executor_pools[uid] = ProcessPoolExecutor(max_workers=5)
return self._executor_pools[uid]
def delete_executor(self, uid):
if uid in self._executor_pools:
self._executor_pools[uid].shutdown()
del self._executor_pools[uid]
async def maintenance(self):
args = {}
@ -57,12 +29,12 @@ class ChannelMessageService(BaseService):
)
if html != message["html"]:
print("Reredefined message", message["uid"])
except Exception as ex:
time.sleep(0.1)
print(ex, flush=True)
while True:
changed = 0
async for message in self.find(is_final=False):
@ -97,14 +69,10 @@ class ChannelMessageService(BaseService):
"color": user["color"],
}
)
loop = asyncio.get_event_loop()
try:
context = json.loads(json.dumps(context, default=str))
model["html"] = await loop.run_in_executor(self.get_or_create_executor(model["uid"]), render,context)
#model['html'] = await loop.run_in_executor(self.get_or_create_executor(user["uid"]), sanitize_html,model['html'])
template = self.app.jinja2_env.get_template("message.html")
model["html"] = template.render(**context)
model['html'] = sanitize_html(model['html'])
except Exception as ex:
print(ex, flush=True)
@ -123,8 +91,6 @@ class ChannelMessageService(BaseService):
["deleted_at"], unique=False
)
self._configured_indexes = True
if model['is_final']:
self.delete_executor(model['uid'])
return model
raise Exception(f"Failed to create channel message: {model.errors}.")
@ -136,7 +102,7 @@ class ChannelMessageService(BaseService):
#if not message["html"].startswith("<chat-message"):
#message = await self.get(uid=message["uid"])
#await self.save(message)
return {
"uid": message["uid"],
"color": user["color"],
@ -161,15 +127,10 @@ class ChannelMessageService(BaseService):
"color": user["color"],
}
)
context = json.loads(json.dumps(context, default=str))
loop = asyncio.get_event_loop()
model["html"] = await loop.run_in_executor(self.get_or_create_executor(model["uid"]), render, context)
#model['html'] = await loop.run_in_executor(self.get_or_create_executor(user["uid"]), sanitize_html,model['html'])
result = await super().save(model)
if model['is_final']:
self.delete_executor(model['uid'])
return result
template = self.app.jinja2_env.get_template("message.html")
model["html"] = template.render(**context)
model['html'] = sanitize_html(model['html'])
return await super().save(model)
async def offset(self, channel_uid, page=0, timestamp=None, page_size=30):
channel = await self.services.channel.get(uid=channel_uid)
@ -195,22 +156,22 @@ class ChannelMessageService(BaseService):
elif page > 0:
async for model in self.query(
f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid WHERE created_at < :timestamp {history_start_filter} ORDER BY created_at DESC LIMIT :page_size",
{
*{
"channel_uid": channel_uid,
"page_size": page_size,
"offset": offset,
"timestamp": timestamp,
},
}.values(),
):
results.append(model)
else:
async for model in self.query(
f"SELECT * FROM channel_message WHERE channel_uid=:channel_uid {history_start_filter} ORDER BY created_at DESC LIMIT :page_size OFFSET :offset",
{
*{
"channel_uid": channel_uid,
"page_size": page_size,
"offset": offset,
},
}.values(),
):
results.append(model)

View File

@ -44,29 +44,19 @@ class SocketService(BaseService):
async def user_availability_service(self):
logger.info("User availability update service started.")
logger.debug("Entering the main loop.")
while True:
logger.info("Updating user availability...")
logger.debug("Initializing users_updated list.")
users_updated = []
logger.debug("Iterating over sockets.")
for s in self.sockets:
logger.debug(f"Checking connection status for socket: {s}.")
if not s.is_connected:
logger.debug("Socket is not connected, continuing to next socket.")
continue
logger.debug(f"Checking if user {s.user} is already updated.")
if s.user not in users_updated:
logger.debug(f"Updating last_ping for user: {s.user}.")
s.user["last_ping"] = now()
logger.debug(f"Saving user {s.user} to the database.")
await self.app.services.user.save(s.user)
logger.debug(f"Adding user {s.user} to users_updated list.")
users_updated.append(s.user)
logger.info(
f"Updated user availability for {len(users_updated)} online users."
)
logger.debug("Sleeping for 60 seconds before the next update.")
await asyncio.sleep(60)
async def add(self, ws, user_uid):

View File

@ -101,6 +101,8 @@ class UserService(BaseService):
model.username.value = username
model.password.value = await security.hash(password)
if await self.save(model):
for x in range(10):
print("Jazeker!!!")
if model:
channel = await self.services.channel.ensure_public_channel(
model["uid"]

View File

@ -7,7 +7,8 @@ class UserPropertyService(BaseService):
mapper_name = "user_property"
async def set(self, user_uid, name, value):
self.mapper.db["user_property"].upsert(
self.mapper.db.upsert(
"user_property",
{
"user_uid": user_uid,
"name": name,

View File

@ -256,9 +256,7 @@ class MessageList extends HTMLElement {
this.querySelectorAll('.avatar').forEach((el) => {
const anchor = el.closest('a');
if (anchor && typeof anchor.href === 'string' && anchor.href.includes(uid)) {
if(!lastElement)
lastElement = el;
lastElement = el;
}
});
if (lastElement) {
@ -280,14 +278,10 @@ class MessageList extends HTMLElement {
upsertMessage(data) {
let message = this.messageMap.get(data.uid);
if (message && (data.is_final || !data.message)) {
//message.parentElement?.removeChild(message);
// TO force insert
//message = null;
}
if(message && !data.message){
message.parentElement?.removeChild(message);
// TO force insert
message = null;
}
if (!data.message) return;

1212
src/snek/system/ads.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
DEFAULT_LIMIT = 30
import asyncio
import typing
import traceback
from snek.system.model import BaseModel
@ -51,7 +51,9 @@ class BaseMapper:
kwargs["uid"] = uid
if not kwargs.get("deleted_at"):
kwargs["deleted_at"] = None
record = await self.run_in_executor(self.table.find_one, **kwargs)
#traceback.print_exc()
record = await self.db.get(self.table_name, kwargs)
if not record:
return None
record = dict(record)
@ -61,23 +63,29 @@ class BaseMapper:
return model
async def exists(self, **kwargs):
return await self.run_in_executor(self.table.exists, **kwargs)
return await self.db.count(self.table_name, kwargs)
#return await self.run_in_executor(self.table.exists, **kwargs)
async def count(self, **kwargs) -> int:
return await self.run_in_executor(self.table.count, **kwargs)
return await self.db.count(self.table_name,kwargs)
async def save(self, model: BaseModel) -> bool:
if not model.record.get("uid"):
raise Exception(f"Attempt to save without uid: {model.record}.")
model.updated_at.update()
return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
model.updated_at.update()
await self.upsert(model)
return model
#return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
async def find(self, **kwargs) -> typing.AsyncGenerator:
if not kwargs.get("_limit"):
kwargs["_limit"] = self.default_limit
if not kwargs.get("deleted_at"):
kwargs["deleted_at"] = None
for record in await self.run_in_executor(self.table.find, **kwargs):
for record in await self.db.find(self.table_name, kwargs):
model = await self.new()
for key, value in record.items():
model[key] = value
@ -88,21 +96,21 @@ class BaseMapper:
return "insert" in sql or "update" in sql or "delete" in sql
async def query(self, sql, *args):
for record in await self.run_in_executor(self.db.query, sql, *args, use_semaphore=await self._use_semaphore(sql)):
for record in await self.db.query(sql, *args):
yield dict(record)
async def update(self, model):
if not model["deleted_at"] is None:
raise Exception("Can't update deleted record.")
model.updated_at.update()
return await self.run_in_executor(self.table.update, model.record, ["uid"],use_semaphore=True)
return await self.db.update(self.table_name, model.record, {"uid": model["uid"]})
async def upsert(self, model):
model.updated_at.update()
return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
await self.db.upsert(self.table_name, model.record, {"uid": model["uid"]})
return model
async def delete(self, **kwargs) -> int:
if not kwargs or not isinstance(kwargs, dict):
raise Exception("Can't execute delete with no filter.")
kwargs["use_semaphore"] = True
return await self.run_in_executor(self.table.delete, **kwargs)
return await self.db.delete(self.table_name, kwargs)

View File

@ -53,7 +53,7 @@ async def auth_middleware(request, handler):
request["user"] = None
if request.session.get("uid") and request.session.get("logged_in"):
request["user"] = await request.app.services.user.get(
uid=request.app.session.get("uid")
uid=request.session.get("uid")
)
return await handler(request)
@ -69,5 +69,5 @@ async def cors_middleware(request, handler):
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "*"
response.headers["Access-Control-Allow-Credentials"] = "true"
return response

View File

@ -36,12 +36,12 @@ class BaseService:
return await self.mapper.new()
async def query(self, sql, *args):
for record in self.app.db.query(sql, *args):
for record in await self.app.db.query(sql, *args):
yield record
async def get(self, *args, **kwargs):
if not "deleted_at" in kwargs:
kwargs["deleted_at"] = None
kwargs["deleted_at"] = None
uid = kwargs.get("uid")
if args:
uid = args[0]
@ -50,7 +50,7 @@ class BaseService:
if result and result.__class__ == self.mapper.model_class:
return result
kwargs["uid"] = uid
print(kwargs,"ZZZZZZZ")
result = await self.mapper.get(**kwargs)
if result:
await self.cache.set(result["uid"], result)

View File

@ -529,8 +529,8 @@ class RPCView(BaseView):
try:
await self.ws.send_str(json.dumps(obj, default=str))
except Exception as ex:
print("THIS IS THE DeAL>",str(ex), flush=True)
await self.services.socket.delete(self.ws)
await self.ws.close()
async def get_online_users(self, channel_uid):
self._require_login()
@ -638,7 +638,7 @@ class RPCView(BaseView):
try:
await rpc(msg.json())
except Exception as ex:
print("XXXXXXXXXX Deleting socket", ex, flush=True)
print("Deleting socket", ex, flush=True)
logger.exception(ex)
await self.services.socket.delete(ws)
break

View File

@ -38,6 +38,10 @@ class WebView(BaseView):
channel = await self.services.channel.get(
uid=self.request.match_info.get("channel")
)
print(self.session.get("uid"),"ZZZZZZZZZZ")
qq = await self.services.user.get(uid=self.session.get("uid"))
print("GGGGGGGGGG",qq)
if not channel:
user = await self.services.user.get(
uid=self.request.match_info.get("channel")