From 44dd77cec5639575cb86973eceb8d174d570370c Mon Sep 17 00:00:00 2001 From: retoor <retoor@molodetz.nl> Date: Wed, 9 Apr 2025 15:12:34 +0200 Subject: [PATCH] Shed. --- src/snek/app.py | 6 +++--- src/snek/form/settings/profile.py | 19 +++++++++++++++---- src/snek/model/user_property.py | 3 --- src/snek/service/__init__.py | 3 ++- src/snek/service/user_property.py | 12 +++++------- src/snek/system/template.py | 8 +++----- src/snek/view/settings/index.py | 7 ++++--- src/snek/view/settings/profile.py | 18 ++++++++---------- src/snek/webdav.py | 29 +++++++++++++++-------------- 9 files changed, 55 insertions(+), 50 deletions(-) diff --git a/src/snek/app.py b/src/snek/app.py index ead9ff8..a019d74 100644 --- a/src/snek/app.py +++ b/src/snek/app.py @@ -92,10 +92,10 @@ class Application(BaseApplication): self.on_startup.append(self.prepare_asyncio) self.on_startup.append(self.prepare_database) - async def prepare_asyncio(self,app): - #app.loop = asyncio.get_running_loop() + async def prepare_asyncio(self, app): + # app.loop = asyncio.get_running_loop() app.executor = ThreadPoolExecutor(max_workers=200) - app.loop.set_default_executor(self.executor) + app.loop.set_default_executor(self.executor) async def create_task(self, task): await self.tasks.put(task) diff --git a/src/snek/form/settings/profile.py b/src/snek/form/settings/profile.py index 24eb884..836cd67 100644 --- a/src/snek/form/settings/profile.py +++ b/src/snek/form/settings/profile.py @@ -1,14 +1,25 @@ -from snek.system.form import Form, FormInputElement, FormButtonElement, HTMLElement +from snek.system.form import Form, FormButtonElement, FormInputElement, HTMLElement class SettingsProfileForm(Form): - nick = FormInputElement(name="nick", required=True, place_holder="Your Nickname", min_length=1, max_length=20) + nick = FormInputElement( + name="nick", + required=True, + place_holder="Your Nickname", + min_length=1, + max_length=20, + ) action = FormButtonElement( name="action", value="submit", text="Save", type="button" ) title = HTMLElement(tag="h1", text="Profile") - profile = FormInputElement(name="profile", place_holder="Tell about yourself.", required=False,max_length=300) + profile = FormInputElement( + name="profile", + place_holder="Tell about yourself.", + required=False, + max_length=300, + ) action = FormButtonElement( name="action", value="submit", text="Save", type="button" - ) \ No newline at end of file + ) diff --git a/src/snek/model/user_property.py b/src/snek/model/user_property.py index 7f0113c..1231423 100644 --- a/src/snek/model/user_property.py +++ b/src/snek/model/user_property.py @@ -1,5 +1,3 @@ -import mimetypes - from snek.system.model import BaseModel, ModelField @@ -7,4 +5,3 @@ class UserPropertyModel(BaseModel): user_uid = ModelField(name="user_uid", required=True, kind=str) name = ModelField(name="name", required=True, kind=str) value = ModelField(name="path", required=True, kind=str) - diff --git a/src/snek/service/__init__.py b/src/snek/service/__init__.py index f491e9b..3ec4592 100644 --- a/src/snek/service/__init__.py +++ b/src/snek/service/__init__.py @@ -9,9 +9,10 @@ from snek.service.drive_item import DriveItemService from snek.service.notification import NotificationService from snek.service.socket import SocketService from snek.service.user import UserService +from snek.service.user_property import UserPropertyService from snek.service.util import UtilService from snek.system.object import Object -from snek.service.user_property import UserPropertyService + @functools.cache def get_services(app): diff --git a/src/snek/service/user_property.py b/src/snek/service/user_property.py index 7531577..e95d62e 100644 --- a/src/snek/service/user_property.py +++ b/src/snek/service/user_property.py @@ -1,6 +1,5 @@ -import pathlib -import json -from snek.system import security +import json + from snek.system.service import BaseService @@ -14,12 +13,12 @@ class UserPropertyService(BaseService): prop["user_uid"] = user_uid prop["name"] = name - prop["value"] = json.dumps(value,default=str) + prop["value"] = json.dumps(value, default=str) return await self.save(prop) - + async def get(self, user_uid, name): try: - return json.loads((await self.get(user_uid=user_uid, name=name)).value) + return json.loads((await self.get(user_uid=user_uid, name=name)).value) except: return None @@ -31,4 +30,3 @@ class UserPropertyService(BaseService): async for result in self.find(name={"ilike": "%" + query + "%"}, **kwargs): results.append(result) return results - diff --git a/src/snek/system/template.py b/src/snek/system/template.py index 8a1202d..d4b6819 100644 --- a/src/snek/system/template.py +++ b/src/snek/system/template.py @@ -89,15 +89,13 @@ def set_link_target_blank(text): def embed_youtube(text): soup = BeautifulSoup(text, "html.parser") for element in soup.find_all("a"): - if ( - element.attrs["href"].startswith("https://www.you") - ): + if element.attrs["href"].startswith("https://www.you"): video_name = element.attrs["href"].split("/")[-1] if "v=" in element.attrs["href"]: video_name = element.attrs["href"].split("?v=")[1].split("&")[0] - #if "si=" in element.attrs["href"]: + # if "si=" in element.attrs["href"]: # video_name = "?v=" + element.attrs["href"].split("/")[-1] - #if "t=" in element.attrs["href"]: + # if "t=" in element.attrs["href"]: # video_name += "&t=" + element.attrs["href"].split("&t=")[1].split("&")[0] embed_template = f'<iframe width="560" height="315" style="display:block" src="https://www.youtube.com/embed/{video_name}" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" referrerpolicy="strict-origin-when-cross-origin" allowfullscreen></iframe>' element.replace_with(BeautifulSoup(embed_template, "html.parser")) diff --git a/src/snek/view/settings/index.py b/src/snek/view/settings/index.py index 1e45b56..418ef3d 100644 --- a/src/snek/view/settings/index.py +++ b/src/snek/view/settings/index.py @@ -1,8 +1,9 @@ -from snek.system.view import BaseView +from snek.system.view import BaseView + class SettingsIndexView(BaseView): - + login_required = True async def get(self): - return await self.render_template('settings/index.html') + return await self.render_template("settings/index.html") diff --git a/src/snek/view/settings/profile.py b/src/snek/view/settings/profile.py index 4e6638c..4a98a9a 100644 --- a/src/snek/view/settings/profile.py +++ b/src/snek/view/settings/profile.py @@ -1,7 +1,7 @@ -from snek.system.view import BaseView,BaseFormView +from aiohttp import web from snek.form.settings.profile import SettingsProfileForm -from aiohttp import web +from snek.system.view import BaseFormView class SettingsProfileView(BaseFormView): @@ -11,13 +11,12 @@ class SettingsProfileView(BaseFormView): async def get(self): form = self.form(app=self.app) - - if self.request.path.endswith(".json"): - form['nick'] = self.request['user']['nick'] - return web.json_response(await form.to_json()) - - user = await self.services.user.get(uid=self.session.get("uid")) + if self.request.path.endswith(".json"): + form["nick"] = self.request["user"]["nick"] + return web.json_response(await form.to_json()) + + user = await self.services.user.get(uid=self.session.get("uid")) return await self.render_template( "settings/profile.html", {"form": await form.to_json(), "user": user} @@ -28,9 +27,8 @@ class SettingsProfileView(BaseFormView): form.set_user_data(post["form"]) if await form.is_valid: - user = self.request['user'] + user = self.request["user"] user["nick"] = form["nick"] await self.services.user.save(user) return {"redirect_url": "/settings/profile.html"} return {"is_valid": False} - diff --git a/src/snek/webdav.py b/src/snek/webdav.py index f982e77..4c57fab 100755 --- a/src/snek/webdav.py +++ b/src/snek/webdav.py @@ -1,8 +1,7 @@ import logging import pathlib -import asyncio + logging.basicConfig(level=logging.DEBUG) -from app.cache import time_cache,time_cache_async import base64 import datetime import mimetypes @@ -13,8 +12,10 @@ import uuid import aiofiles import aiohttp import aiohttp.web +from app.cache import time_cache_async from lxml import etree + @aiohttp.web.middleware async def debug_middleware(request, handler): print(request.method, request.path, request.headers) @@ -26,13 +27,14 @@ async def debug_middleware(request, handler): pass return result + class WebdavApplication(aiohttp.web.Application): def __init__(self, parent, *args, **kwargs): middlewares = [debug_middleware] super().__init__(middlewares=middlewares, *args, **kwargs) self.locks = {} - + self.relative_url = "/webdav" self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options) @@ -182,10 +184,10 @@ class WebdavApplication(aiohttp.web.Application): @time_cache_async(10) async def get_file_size(self, path): - loop = self.parent.loop - stat = await loop.run_in_executor(None,os.stat, path) + loop = self.parent.loop + stat = await loop.run_in_executor(None, os.stat, path) return stat.st_size - + @time_cache_async(10) async def get_directory_size(self, directory): total_size = 0 @@ -196,11 +198,10 @@ class WebdavApplication(aiohttp.web.Application): total_size += await self.get_file_size(str(fp)) return total_size - - @time_cache_async(30) + @time_cache_async(30) async def get_disk_free_space(self, path="/"): - loop = self.parent.loop - statvfs = await loop.run_in_executor(None,os.statvfs, path) + loop = self.parent.loop + statvfs = await loop.run_in_executor(None, os.statvfs, path) return statvfs.f_bavail * statvfs.f_frsize async def create_node(self, request, response_xml, full_path, depth): @@ -208,9 +209,9 @@ class WebdavApplication(aiohttp.web.Application): relative_path = str(full_path.relative_to(request["home"])) href_path = f"{self.relative_url}/{relative_path}".strip(".") - href_path = href_path.replace("./","/") + href_path = href_path.replace("./", "/") href_path = href_path.replace("//", "/") - + response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") href.text = href_path @@ -270,7 +271,7 @@ class WebdavApplication(aiohttp.web.Application): pass requested_path = request.match_info.get("filename", "") - + abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Directory not found") @@ -316,7 +317,7 @@ class WebdavApplication(aiohttp.web.Application): resource = request.match_info.get("filename", "/") lock_token = request.headers.get("Lock-Token", "").replace( "opaquelocktoken:", "" - )[1:-1] + )[1:-1] if self.locks.get(resource) == lock_token: del self.locks[resource] return aiohttp.web.Response(status=204)