progress.

This commit is contained in:
retoor 2025-05-09 14:08:46 +02:00
parent 7e8ae1632d
commit 95ad49df43
9 changed files with 255 additions and 6 deletions
src/snek

View File

@ -38,6 +38,7 @@ from snek.view.login import LoginView
from snek.view.logout import LogoutView
from snek.view.register import RegisterView
from snek.view.rpc import RPCView
from snek.view.repository import RepositoryView
from snek.view.search_user import SearchUserView
from snek.view.settings.repositories import RepositoriesIndexView
from snek.view.settings.repositories import RepositoriesCreateView
@ -164,6 +165,7 @@ class Application(BaseApplication):
self.router.add_view("/login.json", LoginView)
self.router.add_view("/register.html", RegisterView)
self.router.add_view("/register.json", RegisterView)
self.router.add_view("/drive/{rel_path:.*}", DriveView)
self.router.add_view("/drive.bin", UploadView)
self.router.add_view("/drive.bin/{uid}.{ext}", UploadView)
self.router.add_view("/search-user.html", SearchUserView)
@ -180,6 +182,8 @@ class Application(BaseApplication):
self.router.add_view("/drive/{drive}.json", DriveView)
self.router.add_view("/stats.json", StatsView)
self.router.add_view("/user/{user}.html", UserView)
self.router.add_view("/repository/{username}/{repo_name}", RepositoryView)
self.router.add_view("/repository/{username}/{repo_name}/{rel_path:.*}", RepositoryView)
self.router.add_view("/settings/repositories/index.html", RepositoriesIndexView)
self.router.add_view("/settings/repositories/create.html", RepositoriesCreateView)
self.router.add_view("/settings/repositories/repository/{name}/update.html", RepositoriesUpdateView)

View File

@ -9,6 +9,7 @@ class DriveItemModel(BaseModel):
path = ModelField(name="path", required=True, kind=str)
file_type = ModelField(name="file_type", required=True, kind=str)
file_size = ModelField(name="file_size", required=True, kind=int)
is_available = ModelField(name="is_available", required=True, kind=bool, initial_value=True)
@property
def extension(self):

View File

@ -7,6 +7,9 @@ from snek.system.service import BaseService
class UserService(BaseService):
mapper_name = "user"
async def get_by_username(self, username):
return await self.get(username=username)
async def search(self, query, **kwargs):
query = query.strip().lower()
if not query:

View File

@ -107,6 +107,7 @@ class GitApplication(web.Application):
error_response = self.check_repo_exists(repository_path, repo_name)
if error_response:
return error_response
#'''
try:
shutil.rmtree(self.repo_path(repository_path, repo_name))
logger.info(f"Deleted repository: {repo_name} for user {username}")

View File

@ -15,8 +15,15 @@
<script src="/generic-form.js" type="module"></script>
<script src="/html-frame.js" type="module"></script>
<script src="/app.js" type="module"></script>
<script src="/file-manager.js" type="module"></script>
<link rel="stylesheet" href="/base.css">
<link
rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css"
integrity="sha512-pBMV+3tn6+5xAZuhI6tyCmQkXh15riZDqGPxAx/U+FuiI5Dh3ZTjM23cZqQ25jJCfi8+ka9gzC2ukNkGkP/Aw=="
crossorigin="anonymous"
referrerpolicy="no-referrer"
/>
<link rel="icon" type="image/png" href="/image/snek1.png" sizes="32x32">
<script defer src="https://umami.molodetz.nl/script.js" data-website-id="d127c3e4-dc70-4041-a1c8-bcc32c2492ea"></script>
</head>

View File

@ -5,7 +5,6 @@
{% block header_text %}<h2 style="color:#fff">Search</h2>{% endblock %}
{% block main %}
<section class="chat-area">
<div class="chat-header">
<h2>Search user</h2>

View File

@ -83,10 +83,10 @@
</span>
</div>
<div class="actions">
<a class="button browse" href="/repositories/{{ user.username }}/{{ repo.name }}" target="_blank">
<a class="button browse" href="/repository/{{ user.username.value }}/{{ repo.name }}" target="_blank">
<i class="fa-solid fa-folder-open"></i> Browse
</a>
<a class="button clone" href="/repositories/{{ user.username }}/{{ repo.name }}/clone">
<a class="button clone" href="/git/{{ user.uid.value }}/{{ repo.name.value }}">
<i class="fa-solid fa-code-branch"></i> Clone
</a>
<a class="button edit" href="/settings/repositories/repository/{{ repo.name }}/update.html">

View File

@ -3,18 +3,250 @@ from aiohttp import web
from snek.system.view import BaseView
import os
import mimetypes
from aiohttp import web
from urllib.parse import unquote, quote
from datetime import datetime
"""Run with: python server.py (Python  3.9)
Visit http://localhost:8080 to try the demo.
"""
from aiohttp import web
from pathlib import Path
import mimetypes, urllib.parse
# ---------- Configuration --------------------------------------------------
BASE_DIR = Path(__file__).parent.resolve()
ROOT_DIR = (BASE_DIR / "storage").resolve() # files shown to the outside world
ASSETS_DIR = (BASE_DIR / "assets").resolve() # JS & demo HTML
ROOT_DIR.mkdir(exist_ok=True)
ASSETS_DIR.mkdir(exist_ok=True)
# ---------- Helpers --------------------------------------------------------
def safe_resolve_path(rel: str) -> Path:
"""Return *absolute* path inside ROOT_DIR or raise FileNotFoundError."""
target = (ROOT_DIR / rel.lstrip("/")).resolve()
if target == ROOT_DIR or ROOT_DIR in target.parents:
return target
raise FileNotFoundError("Unsafe path")
# ---------- API view -------------------------------------------------------
class DriveView(BaseView):
async def get(self):
rel = self.request.query.get("path", "")
offset = int(self.request.query.get("offset", 0))
limit = int(self.request.query.get("limit", 20))
target = await self.services.user.get_home_folder(self.session.get("uid"))
if rel:
target.joinpath(rel)
if not target.exists():
return web.json_response({"error": "Not found"}, status=404)
# ---- Directory listing -------------------------------------------
if target.is_dir():
entries = []
# Directories first, then files both alphabetical (caseinsensitive)
for p in sorted(target.iterdir(), key=lambda p: (p.is_file(), p.name.lower())):
item_path = (Path(rel) / p.name).as_posix()
mime = mimetypes.guess_type(p.name)[0] if p.is_file() else "inode/directory"
url = (self.request.url.with_path(f"/drive/{urllib.parse.quote(item_path)}")
if p.is_file() else None)
entries.append({
"name": p.name,
"type": "directory" if p.is_dir() else "file",
"mimetype": mime,
"size": p.stat().st_size if p.is_file() else None,
"path": item_path,
"url": url,
})
import json
total = len(entries)
items = entries[offset:offset+limit]
return web.json_response({
"items": json.loads(json.dumps(items,default=str)),
"pagination": {"offset": offset, "limit": limit, "total": total}
})
with open(target, "rb") as f:
content = f.read()
return web.Response(body=content, content_type=mimetypes.guess_type(target.name)[0])
# ---- Single file metadata ----------------------------------------
url = self.request.url.with_path(f"/drive/{urllib.parse.quote(rel)}")
return web.json_response({
"name": target.name,
"type": "file",
"mimetype": mimetypes.guess_type(target.name)[0],
"size": target.stat().st_size,
"path": rel,
"url": str(url),
})
class DriveView222(BaseView):
PAGE_SIZE = 20
async def base_path(self):
return await self.services.user.get_home_folder(self.session.get("uid"))
async def get_full_path(self, rel_path):
base_path = await self.base_path()
safe_path = os.path.normpath(unquote(rel_path or ""))
full_path = os.path.abspath(os.path.join(base_path, safe_path))
if not full_path.startswith(os.path.abspath(base_path)):
raise web.HTTPForbidden(reason="Invalid path")
return full_path
async def make_absolute_url(self, rel_path):
rel_path = rel_path.lstrip("/")
url = str(self.request.url.with_path(f"/drive/{quote(rel_path)}"))
return url
async def entry_details(self, dir_path, entry, parent_rel_path):
entry_path = os.path.join(dir_path, entry)
stat = os.stat(entry_path)
is_dir = os.path.isdir(entry_path)
mimetype = None if is_dir else (mimetypes.guess_type(entry_path)[0] or "application/octet-stream")
size = stat.st_size if not is_dir else None
created_at = datetime.fromtimestamp(stat.st_ctime).isoformat()
updated_at = datetime.fromtimestamp(stat.st_mtime).isoformat()
rel_entry_path = os.path.join(parent_rel_path, entry).replace("\\", "/")
return {
"name": entry,
"type": "dir" if is_dir else "file",
"mimetype": mimetype,
"size": size,
"created_at": created_at,
"updated_at": updated_at,
"absolute_url": await self.make_absolute_url(rel_entry_path),
}
async def get(self):
rel_path = self.request.match_info.get("rel_path", "")
full_path = await self.get_full_path(rel_path)
page = int(self.request.query.get("page", 1))
page_size = int(self.request.query.get("page_size", self.PAGE_SIZE))
abs_url = await self.make_absolute_url(rel_path)
if not os.path.exists(full_path):
raise web.HTTPNotFound(reason="Path not found")
if os.path.isdir(full_path):
entries = os.listdir(full_path)
entries.sort()
start = (page - 1) * page_size
end = start + page_size
paged_entries = entries[start:end]
details = [await self.entry_details(full_path, entry, rel_path) for entry in paged_entries]
return web.json_response({
"path": rel_path,
"absolute_url": abs_url,
"entries": details,
"total": len(entries),
"page": page,
"page_size": page_size,
})
else:
with open(full_path, "rb") as f:
content = f.read()
mimetype = mimetypes.guess_type(full_path)[0] or "application/octet-stream"
headers = {"X-Absolute-Url": abs_url}
return web.Response(body=content, content_type=mimetype, headers=headers)
async def post(self):
rel_path = self.request.match_info.get("rel_path", "")
full_path = await self.get_full_path(rel_path)
abs_url = await self.make_absolute_url(rel_path)
if os.path.exists(full_path):
raise web.HTTPConflict(reason="File or directory already exists")
data = await self.request.post()
if data.get("type") == "dir":
os.makedirs(full_path)
return web.json_response({"status": "created", "type": "dir", "absolute_url": abs_url})
else:
file_field = data.get("file")
if not file_field:
raise web.HTTPBadRequest(reason="No file uploaded")
with open(full_path, "wb") as f:
f.write(file_field.file.read())
return web.json_response({"status": "created", "type": "file", "absolute_url": abs_url})
async def put(self):
rel_path = self.request.match_info.get("rel_path", "")
full_path = await self.get_full_path(rel_path)
abs_url = await self.make_absolute_url(rel_path)
if not os.path.exists(full_path):
raise web.HTTPNotFound(reason="File not found")
if os.path.isdir(full_path):
raise web.HTTPBadRequest(reason="Cannot overwrite directory")
body = await self.request.read()
with open(full_path, "wb") as f:
f.write(body)
return web.json_response({"status": "updated", "absolute_url": abs_url})
async def delete(self):
rel_path = self.request.match_info.get("rel_path", "")
full_path = await self.get_full_path(rel_path)
abs_url = await self.make_absolute_url(rel_path)
if not os.path.exists(full_path):
raise web.HTTPNotFound(reason="Path not found")
if os.path.isdir(full_path):
os.rmdir(full_path)
return web.json_response({"status": "deleted", "type": "dir", "absolute_url": abs_url})
else:
os.remove(full_path)
return web.json_response({"status": "deleted", "type": "file", "absolute_url": abs_url})
class DriveViewi2(BaseView):
login_required = True
async def get(self):
drive_uid = self.request.match_info.get("drive")
before = self.request.query.get("before")
filters = {}
if before:
filters["created_at__lt"] = before
if drive_uid:
filters['drive_uid'] = drive_uid
drive = await self.services.drive.get(uid=drive_uid)
drive_items = []
async for item in drive.items:
async for item in self.services.drive_item.find(**filters):
record = item.record
record["url"] = "/drive.bin/" + record["uid"] + "." + item.extension
drive_items.append(record)

View File

@ -15,8 +15,10 @@ class RepositoriesIndexView(BaseFormView):
repositories = []
async for repository in self.services.repository.find(user_uid=user_uid):
repositories.append(repository.record)
user = await self.services.user.get(uid=self.session.get("uid"))
return await self.render_template("settings/repositories/index.html", {"repositories": repositories})
return await self.render_template("settings/repositories/index.html", {"repositories": repositories, "user": user})