This commit is contained in:
retoor 2025-05-09 15:55:51 +02:00
parent 1616e4edb9
commit 44ac1d2bfa
3 changed files with 267 additions and 16 deletions

View File

@ -33,7 +33,8 @@ dependencies = [
"PyJWT",
"multiavatar",
"gitpython",
"uvloop"
"uvloop",
"humanize"
]
[tool.setuptools.packages.find]

View File

@ -182,8 +182,8 @@ class Application(BaseApplication):
self.router.add_view("/drive/{drive}.json", DriveView)
self.router.add_view("/stats.json", StatsView)
self.router.add_view("/user/{user}.html", UserView)
self.router.add_view("/repository/{username}/{repo_name}", RepositoryView)
self.router.add_view("/repository/{username}/{repo_name}/{rel_path:.*}", RepositoryView)
self.router.add_view("/repository/{username}/{repository}", RepositoryView)
self.router.add_view("/repository/{username}/{repository}/{path:.*}", RepositoryView)
self.router.add_view("/settings/repositories/index.html", RepositoriesIndexView)
self.router.add_view("/settings/repositories/create.html", RepositoriesCreateView)
self.router.add_view("/settings/repositories/repository/{name}/update.html", RepositoriesUpdateView)

View File

@ -1,15 +1,265 @@
from snek.system.view import BaseView
import os
import mimetypes
import urllib.parse
from pathlib import Path
import humanize
from aiohttp import web
from snek.system.view import BaseView
import asyncio
from git import Repo
class BareRepoNavigator:
def __init__(self, repo_path):
"""Initialize the navigator with a bare repository path."""
try:
self.repo = Repo(repo_path)
if not self.repo.bare:
print(f"Error: {repo_path} is not a bare repository.")
sys.exit(1)
except git.exc.InvalidGitRepositoryError:
print(f"Error: {repo_path} is not a valid Git repository.")
sys.exit(1)
except Exception as e:
print(f"Error opening repository: {str(e)}")
sys.exit(1)
self.repo_path = repo_path
self.branches = list(self.repo.branches)
self.current_branch = None
self.current_commit = None
self.current_path = ""
self.history = []
def get_branches(self):
"""Return a list of branch names in the repository."""
return [branch.name for branch in self.branches]
def set_branch(self, branch_name):
"""Set the current branch."""
try:
self.current_branch = self.repo.branches[branch_name]
self.current_commit = self.current_branch.commit
self.current_path = ""
self.history = []
return True
except IndexError:
return False
def get_commits(self, count=10):
"""Get the latest commits on the current branch."""
if not self.current_branch:
return []
commits = []
for commit in self.repo.iter_commits(self.current_branch, max_count=count):
commits.append({
'hash': commit.hexsha,
'short_hash': commit.hexsha[:7],
'message': commit.message.strip(),
'author': commit.author.name,
'date': datetime.fromtimestamp(commit.committed_date).strftime('%Y-%m-%d %H:%M:%S')
})
return commits
def set_commit(self, commit_hash):
"""Set the current commit by hash."""
try:
self.current_commit = self.repo.commit(commit_hash)
self.current_path = ""
self.history = []
return True
except ValueError:
return False
def list_directory(self, path=""):
"""List the contents of a directory in the current commit."""
if not self.current_commit:
return {'dirs': [], 'files': []}
dirs = []
files = []
try:
# Get the tree at the current path
if path:
tree = self.current_commit.tree[path]
if not hasattr(tree, 'trees'): # It's a blob, not a tree
return {'dirs': [], 'files': [path]}
else:
tree = self.current_commit.tree
# List directories and files
for item in tree:
if item.type == 'tree':
item_path = os.path.join(path, item.name) if path else item.name
dirs.append(item_path)
elif item.type == 'blob':
item_path = os.path.join(path, item.name) if path else item.name
files.append(item_path)
dirs.sort()
files.sort()
return {'dirs': dirs, 'files': files}
except KeyError:
return {'dirs': [], 'files': []}
def get_file_content(self, file_path):
"""Get the content of a file in the current commit."""
if not self.current_commit:
return None
try:
blob = self.current_commit.tree[file_path]
return blob.data_stream.read().decode('utf-8', errors='replace')
except (KeyError, UnicodeDecodeError):
try:
# Try to get as binary if text decoding fails
blob = self.current_commit.tree[file_path]
return blob.data_stream.read()
except:
return None
def navigate_to(self, path):
"""Navigate to a specific path, updating the current path."""
if not self.current_commit:
return False
try:
if path:
self.current_commit.tree[path] # Check if path exists
self.history.append(self.current_path)
self.current_path = path
return True
except KeyError:
return False
def navigate_back(self):
"""Navigate back to the previous path."""
if self.history:
self.current_path = self.history.pop()
return True
return False
class RepositoryView(BaseView):
async def get(A):
G='type';H='name';I='.git';J='username';B=A.request.match_info[J];K=A.request.match_info['repo_name'];C=A.request.match_info.get('rel_path','')
if not B.count('-')==4:E=await A.services.user.get_by_username(B)
else:E=await A.services.user.get(B)
if not E:return web.HTTPNotFound()
B=E[J];M=await A.services.user.get_repository_path(E['uid'])
if C.endswith(I):C=C[:-4]
L=M.joinpath(K+I)
if not L.exists():return web.HTTPNotFound()
import os;from git import Repo;N=Repo(L.joinpath(C));F=[];O=[];P=N.head.commit
for D in P.tree.traverse():F.append({H:D.name,'mode':D.mode,G:D.type,'path':D.path,'size':D.size})
sorted(F,key=lambda x:x[H]);sorted(F,key=lambda x:x[G],reverse=True);Q=f"{B}/{C}"[:-4];return await A.render_template('repository.html',dict(username=B,repo_name=K,rel_path=C,full_path=Q,files=F,directories=O))
login_required = True
def checkout_bare_repo(self, bare_repo_path: Path, target_path: Path, ref: str = 'HEAD'):
repo = Repo(bare_repo_path)
assert repo.bare, "Repository is not bare."
commit = repo.commit(ref)
tree = commit.tree
for blob in tree.traverse():
target_file = target_path / blob.path
target_file.parent.mkdir(parents=True, exist_ok=True)
print(blob.path)
with open(target_file, 'wb') as f:
f.write(blob.data_stream.read())
async def get(self):
base_repo_path = Path("drive/repositories")
authenticated_user_id = self.session.get("uid")
username = self.request.match_info.get('username')
repo_name = self.request.match_info.get('repository')
rel_path = self.request.match_info.get('path', '')
user = None
if not username.count("-") == 4:
user = await self.app.services.user.get(username=username)
if not user:
return web.Response(text="404 Not Found", status=404)
username = user["username"]
else:
user = await self.app.services.user.get(uid=username)
repo = await self.app.services.repository.get(name=repo_name, user_uid=user["uid"])
if not repo:
return web.Response(text="404 Not Found", status=404)
if repo['is_private'] and authenticated_user_id != repo['uid']:
return web.Response(text="404 Not Found", status=404)
repo_root_base = (base_repo_path / user['uid'] / (repo_name + ".git")).resolve()
repo_root = (base_repo_path / user['uid'] / repo_name).resolve()
try:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None,
self.checkout_bare_repo, repo_root_base, repo_root
)
except:
pass
if not repo_root.exists() or not repo_root.is_dir():
return web.Response(text="404 Not Found", status=404)
safe_rel_path = os.path.normpath(rel_path).lstrip(os.sep)
abs_path = (repo_root / safe_rel_path).resolve()
if not abs_path.exists() or not abs_path.is_relative_to(repo_root):
return web.Response(text="404 Not Found", status=404)
if abs_path.is_dir():
return web.Response(text=self.render_directory(abs_path, username, repo_name, safe_rel_path), content_type='text/html')
else:
return web.Response(text=self.render_file(abs_path), content_type='text/html')
def render_directory(self, abs_path, username, repo_name, safe_rel_path):
entries = sorted(abs_path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower()))
items = []
if safe_rel_path:
parent_path = Path(safe_rel_path).parent
parent_link = f"/repository/{username}/{repo_name}/{parent_path}".rstrip('/')
items.append(f'<li><a href="{parent_link}">⬅️ ..</a></li>')
for entry in entries:
link_path = urllib.parse.quote(str(Path(safe_rel_path) / entry.name))
link = f"/repository/{username}/{repo_name}/{link_path}".rstrip('/')
display = entry.name + ('/' if entry.is_dir() else '')
size = '' if entry.is_dir() else humanize.naturalsize(entry.stat().st_size)
icon = self.get_icon(entry)
items.append(f'<li>{icon} <a href="{link}">{display}</a> {size}</li>')
html = f"""
<html>
<head><title>πŸ“ {repo_name}/{safe_rel_path}</title></head>
<body>
<h2>πŸ“ {username}/{repo_name}/{safe_rel_path}</h2>
<ul>
{''.join(items)}
</ul>
</body>
</html>
"""
return html
def render_file(self, abs_path):
try:
with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return f"<pre>{content}</pre>"
except Exception as e:
return f"<h1>Error</h1><pre>{e}</pre>"
def get_icon(self, file):
if file.is_dir(): return "πŸ“"
mime = mimetypes.guess_type(file.name)[0] or ''
if mime.startswith("image"): return "πŸ–ΌοΈ"
if mime.startswith("text"): return "πŸ“„"
if mime.startswith("audio"): return "🎡"
if mime.startswith("video"): return "🎬"
if file.name.endswith(".py"): return "🐍"
return "πŸ“¦"