Compare commits
4 Commits
5e99e894e9
...
b9b31a494a
| Author | SHA1 | Date | |
|---|---|---|---|
| b9b31a494a | |||
| b961954aa1 | |||
| 84287808c8 | |||
| 692272e3ca |
@ -40,8 +40,7 @@ dependencies = [
|
||||
"pillow-heif",
|
||||
"IP2Location",
|
||||
"bleach",
|
||||
"sentry-sdk",
|
||||
"aiosqlite"
|
||||
"sentry-sdk"
|
||||
]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
|
||||
@ -6,10 +6,11 @@ import uuid
|
||||
import signal
|
||||
from datetime import datetime
|
||||
from contextlib import asynccontextmanager
|
||||
import aiohttp_debugtoolbar
|
||||
|
||||
from snek import snode
|
||||
from snek.view.threads import ThreadsView
|
||||
from snek.system.ads import AsyncDataSet
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from ipaddress import ip_address
|
||||
@ -142,7 +143,6 @@ class Application(BaseApplication):
|
||||
client_max_size=1024 * 1024 * 1024 * 5 * args,
|
||||
**kwargs,
|
||||
)
|
||||
self.db = AsyncDataSet(kwargs["db_path"].replace("sqlite:///", ""))
|
||||
session_setup(self, EncryptedCookieStorage(SESSION_KEY))
|
||||
self.tasks = asyncio.Queue()
|
||||
self._middlewares.append(session_middleware)
|
||||
@ -175,8 +175,10 @@ class Application(BaseApplication):
|
||||
self.on_startup.append(self.prepare_asyncio)
|
||||
self.on_startup.append(self.start_user_availability_service)
|
||||
self.on_startup.append(self.start_ssh_server)
|
||||
#self.on_startup.append(self.prepare_database)
|
||||
self.on_startup.append(self.prepare_database)
|
||||
|
||||
|
||||
|
||||
async def prepare_stats(self, app):
|
||||
app['stats'] = create_stats_structure()
|
||||
print("Stats prepared", flush=True)
|
||||
@ -246,8 +248,18 @@ class Application(BaseApplication):
|
||||
|
||||
|
||||
async def prepare_database(self, app):
|
||||
await self.db.query_raw("PRAGMA journal_mode=WAL")
|
||||
await self.db.query_raw("PRAGMA syncnorm=off")
|
||||
self.db.query("PRAGMA journal_mode=WAL")
|
||||
self.db.query("PRAGMA syncnorm=off")
|
||||
|
||||
try:
|
||||
if not self.db["user"].has_index("username"):
|
||||
self.db["user"].create_index("username", unique=True)
|
||||
if not self.db["channel_member"].has_index(["channel_uid", "user_uid"]):
|
||||
self.db["channel_member"].create_index(["channel_uid", "user_uid"])
|
||||
if not self.db["channel_message"].has_index(["channel_uid", "user_uid"]):
|
||||
self.db["channel_message"].create_index(["channel_uid", "user_uid"])
|
||||
except:
|
||||
pass
|
||||
|
||||
await self.services.drive.prepare_all()
|
||||
self.loop.create_task(self.task_runner())
|
||||
@ -278,9 +290,9 @@ class Application(BaseApplication):
|
||||
self.router.add_view("/login.json", LoginView)
|
||||
self.router.add_view("/register.html", RegisterView)
|
||||
self.router.add_view("/register.json", RegisterView)
|
||||
self.router.add_view("/drive/{rel_path:.*}", DriveView)
|
||||
self.router.add_view("/drive.bin", UploadView)
|
||||
self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
|
||||
# self.router.add_view("/drive/{rel_path:.*}", DriveView)
|
||||
## self.router.add_view("/drive.bin", UploadView)
|
||||
# self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
|
||||
self.router.add_view("/search-user.html", SearchUserView)
|
||||
self.router.add_view("/search-user.json", SearchUserView)
|
||||
self.router.add_view("/avatar/{uid}.svg", AvatarView)
|
||||
@ -288,25 +300,25 @@ class Application(BaseApplication):
|
||||
self.router.add_get("/http-photo", self.handle_http_photo)
|
||||
self.router.add_get("/rpc.ws", RPCView)
|
||||
self.router.add_get("/c/{channel:.*}", ChannelView)
|
||||
self.router.add_view(
|
||||
"/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
|
||||
)
|
||||
self.router.add_view(
|
||||
"/channel/{channel_uid}/drive.json", ChannelDriveApiView
|
||||
)
|
||||
#self.router.add_view(
|
||||
# "/channel/{channel_uid}/attachment.bin", ChannelAttachmentView
|
||||
#)
|
||||
#self.router.add_view(
|
||||
# "/channel/{channel_uid}/drive.json", ChannelDriveApiView
|
||||
#)
|
||||
self.router.add_view(
|
||||
"/channel/{channel_uid}/attachment.sock", ChannelAttachmentUploadView
|
||||
)
|
||||
self.router.add_view(
|
||||
"/channel/attachment/{relative_url:.*}", ChannelAttachmentView
|
||||
)
|
||||
)#
|
||||
self.router.add_view("/channel/{channel}.html", WebView)
|
||||
self.router.add_view("/threads.html", ThreadsView)
|
||||
self.router.add_view("/terminal.ws", TerminalSocketView)
|
||||
self.router.add_view("/terminal.html", TerminalView)
|
||||
self.router.add_view("/drive.json", DriveApiView)
|
||||
self.router.add_view("/drive.html", DriveView)
|
||||
self.router.add_view("/drive/{drive}.json", DriveView)
|
||||
#self.router.add_view("/drive.json", DriveApiView)
|
||||
#self.router.add_view("/drive.html", DriveView)
|
||||
#self.router.add_view("/drive/{drive}.json", DriveView)
|
||||
self.router.add_get("/stats.html", stats_handler)
|
||||
self.router.add_view("/stats.json", StatsView)
|
||||
self.router.add_view("/user/{user}.html", UserView)
|
||||
@ -487,6 +499,7 @@ class Application(BaseApplication):
|
||||
raise raised_exception
|
||||
|
||||
app = Application(db_path="sqlite:///snek.db")
|
||||
#aiohttp_debugtoolbar.setup(app)
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
@ -23,7 +23,7 @@ class ChannelModel(BaseModel):
|
||||
history_start_filter = f" AND created_at > '{self['history_start']}' "
|
||||
try:
|
||||
async for model in self.app.services.channel_message.query(
|
||||
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY created_at DESC LIMIT 1",
|
||||
"SELECT uid FROM channel_message WHERE channel_uid=:channel_uid" + history_start_filter + " ORDER BY id DESC LIMIT 1",
|
||||
{"channel_uid": self["uid"]},
|
||||
):
|
||||
|
||||
|
||||
@ -1,6 +1,11 @@
|
||||
from snek.system.service import BaseService
|
||||
from snek.system.template import sanitize_html
|
||||
import time
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
executor = ThreadPoolExecutor(max_workers=50)
|
||||
|
||||
|
||||
class ChannelMessageService(BaseService):
|
||||
mapper_name = "channel_message"
|
||||
@ -69,10 +74,11 @@ class ChannelMessageService(BaseService):
|
||||
"color": user["color"],
|
||||
}
|
||||
)
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
template = self.app.jinja2_env.get_template("message.html")
|
||||
model["html"] = template.render(**context)
|
||||
model['html'] = sanitize_html(model['html'])
|
||||
model["html"] = await loop.run_in_executor(executor, lambda: template.render(**context))
|
||||
model['html'] = await loop.run_in_executor(executor, lambda:sanitize_html(model['html']))
|
||||
except Exception as ex:
|
||||
print(ex, flush=True)
|
||||
|
||||
@ -128,8 +134,10 @@ class ChannelMessageService(BaseService):
|
||||
}
|
||||
)
|
||||
template = self.app.jinja2_env.get_template("message.html")
|
||||
model["html"] = template.render(**context)
|
||||
model['html'] = sanitize_html(model['html'])
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
model["html"] = await loop.run_in_executor(executor, lambda: template.render(**context))
|
||||
model['html'] = await loop.run_in_executor(executor, lambda: sanitize_html(model['html']))
|
||||
return await super().save(model)
|
||||
|
||||
async def offset(self, channel_uid, page=0, timestamp=None, page_size=30):
|
||||
|
||||
@ -44,19 +44,29 @@ class SocketService(BaseService):
|
||||
|
||||
async def user_availability_service(self):
|
||||
logger.info("User availability update service started.")
|
||||
logger.debug("Entering the main loop.")
|
||||
while True:
|
||||
logger.info("Updating user availability...")
|
||||
logger.debug("Initializing users_updated list.")
|
||||
users_updated = []
|
||||
logger.debug("Iterating over sockets.")
|
||||
for s in self.sockets:
|
||||
logger.debug(f"Checking connection status for socket: {s}.")
|
||||
if not s.is_connected:
|
||||
logger.debug("Socket is not connected, continuing to next socket.")
|
||||
continue
|
||||
logger.debug(f"Checking if user {s.user} is already updated.")
|
||||
if s.user not in users_updated:
|
||||
logger.debug(f"Updating last_ping for user: {s.user}.")
|
||||
s.user["last_ping"] = now()
|
||||
logger.debug(f"Saving user {s.user} to the database.")
|
||||
await self.app.services.user.save(s.user)
|
||||
logger.debug(f"Adding user {s.user} to users_updated list.")
|
||||
users_updated.append(s.user)
|
||||
logger.info(
|
||||
f"Updated user availability for {len(users_updated)} online users."
|
||||
)
|
||||
logger.debug("Sleeping for 60 seconds before the next update.")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
async def add(self, ws, user_uid):
|
||||
|
||||
@ -101,8 +101,6 @@ class UserService(BaseService):
|
||||
model.username.value = username
|
||||
model.password.value = await security.hash(password)
|
||||
if await self.save(model):
|
||||
for x in range(10):
|
||||
print("Jazeker!!!")
|
||||
if model:
|
||||
channel = await self.services.channel.ensure_public_channel(
|
||||
model["uid"]
|
||||
|
||||
@ -7,8 +7,7 @@ class UserPropertyService(BaseService):
|
||||
mapper_name = "user_property"
|
||||
|
||||
async def set(self, user_uid, name, value):
|
||||
self.mapper.db.upsert(
|
||||
"user_property",
|
||||
self.mapper.db["user_property"].upsert(
|
||||
{
|
||||
"user_uid": user_uid,
|
||||
"name": name,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,7 +1,7 @@
|
||||
DEFAULT_LIMIT = 30
|
||||
import asyncio
|
||||
import typing
|
||||
import traceback
|
||||
|
||||
from snek.system.model import BaseModel
|
||||
|
||||
|
||||
@ -51,9 +51,7 @@ class BaseMapper:
|
||||
kwargs["uid"] = uid
|
||||
if not kwargs.get("deleted_at"):
|
||||
kwargs["deleted_at"] = None
|
||||
#traceback.print_exc()
|
||||
|
||||
record = await self.db.get(self.table_name, kwargs)
|
||||
record = await self.run_in_executor(self.table.find_one, **kwargs)
|
||||
if not record:
|
||||
return None
|
||||
record = dict(record)
|
||||
@ -63,29 +61,23 @@ class BaseMapper:
|
||||
return model
|
||||
|
||||
async def exists(self, **kwargs):
|
||||
return await self.db.count(self.table_name, kwargs)
|
||||
|
||||
|
||||
#return await self.run_in_executor(self.table.exists, **kwargs)
|
||||
return await self.run_in_executor(self.table.exists, **kwargs)
|
||||
|
||||
async def count(self, **kwargs) -> int:
|
||||
return await self.db.count(self.table_name,kwargs)
|
||||
|
||||
return await self.run_in_executor(self.table.count, **kwargs)
|
||||
|
||||
async def save(self, model: BaseModel) -> bool:
|
||||
if not model.record.get("uid"):
|
||||
raise Exception(f"Attempt to save without uid: {model.record}.")
|
||||
model.updated_at.update()
|
||||
await self.upsert(model)
|
||||
return model
|
||||
#return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
|
||||
model.updated_at.update()
|
||||
return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
|
||||
|
||||
async def find(self, **kwargs) -> typing.AsyncGenerator:
|
||||
if not kwargs.get("_limit"):
|
||||
kwargs["_limit"] = self.default_limit
|
||||
if not kwargs.get("deleted_at"):
|
||||
kwargs["deleted_at"] = None
|
||||
for record in await self.db.find(self.table_name, kwargs):
|
||||
for record in await self.run_in_executor(self.table.find, **kwargs):
|
||||
model = await self.new()
|
||||
for key, value in record.items():
|
||||
model[key] = value
|
||||
@ -96,21 +88,21 @@ class BaseMapper:
|
||||
return "insert" in sql or "update" in sql or "delete" in sql
|
||||
|
||||
async def query(self, sql, *args):
|
||||
for record in await self.db.query(sql, *args):
|
||||
for record in await self.run_in_executor(self.db.query, sql, *args, use_semaphore=await self._use_semaphore(sql)):
|
||||
yield dict(record)
|
||||
|
||||
async def update(self, model):
|
||||
if not model["deleted_at"] is None:
|
||||
raise Exception("Can't update deleted record.")
|
||||
model.updated_at.update()
|
||||
return await self.db.update(self.table_name, model.record, {"uid": model["uid"]})
|
||||
return await self.run_in_executor(self.table.update, model.record, ["uid"],use_semaphore=True)
|
||||
|
||||
async def upsert(self, model):
|
||||
model.updated_at.update()
|
||||
await self.db.upsert(self.table_name, model.record, {"uid": model["uid"]})
|
||||
return model
|
||||
return await self.run_in_executor(self.table.upsert, model.record, ["uid"],use_semaphore=True)
|
||||
|
||||
async def delete(self, **kwargs) -> int:
|
||||
if not kwargs or not isinstance(kwargs, dict):
|
||||
raise Exception("Can't execute delete with no filter.")
|
||||
return await self.db.delete(self.table_name, kwargs)
|
||||
kwargs["use_semaphore"] = True
|
||||
return await self.run_in_executor(self.table.delete, **kwargs)
|
||||
|
||||
@ -50,7 +50,7 @@ class BaseService:
|
||||
if result and result.__class__ == self.mapper.model_class:
|
||||
return result
|
||||
kwargs["uid"] = uid
|
||||
print(kwargs,"ZZZZZZZ")
|
||||
|
||||
result = await self.mapper.get(**kwargs)
|
||||
if result:
|
||||
await self.cache.set(result["uid"], result)
|
||||
|
||||
@ -529,8 +529,8 @@ class RPCView(BaseView):
|
||||
try:
|
||||
await self.ws.send_str(json.dumps(obj, default=str))
|
||||
except Exception as ex:
|
||||
print("THIS IS THE DeAL>",str(ex), flush=True)
|
||||
await self.services.socket.delete(self.ws)
|
||||
await self.ws.close()
|
||||
|
||||
async def get_online_users(self, channel_uid):
|
||||
self._require_login()
|
||||
@ -638,7 +638,7 @@ class RPCView(BaseView):
|
||||
try:
|
||||
await rpc(msg.json())
|
||||
except Exception as ex:
|
||||
print("Deleting socket", ex, flush=True)
|
||||
print("XXXXXXXXXX Deleting socket", ex, flush=True)
|
||||
logger.exception(ex)
|
||||
await self.services.socket.delete(ws)
|
||||
break
|
||||
|
||||
@ -38,10 +38,6 @@ class WebView(BaseView):
|
||||
channel = await self.services.channel.get(
|
||||
uid=self.request.match_info.get("channel")
|
||||
)
|
||||
print(self.session.get("uid"),"ZZZZZZZZZZ")
|
||||
qq = await self.services.user.get(uid=self.session.get("uid"))
|
||||
|
||||
print("GGGGGGGGGG",qq)
|
||||
if not channel:
|
||||
user = await self.services.user.get(
|
||||
uid=self.request.match_info.get("channel")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user