diff --git a/pyproject.toml b/pyproject.toml index b6f1688..cc84391 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ "PyJWT", "multiavatar", "gitpython", - "uvloop" + "uvloop", + "humanize" ] [tool.setuptools.packages.find] diff --git a/src/snek/app.py b/src/snek/app.py index ceb7c9d..3e4ad5e 100644 --- a/src/snek/app.py +++ b/src/snek/app.py @@ -182,8 +182,8 @@ class Application(BaseApplication): self.router.add_view("/drive/{drive}.json", DriveView) self.router.add_view("/stats.json", StatsView) self.router.add_view("/user/{user}.html", UserView) - self.router.add_view("/repository/{username}/{repo_name}", RepositoryView) - self.router.add_view("/repository/{username}/{repo_name}/{rel_path:.*}", RepositoryView) + self.router.add_view("/repository/{username}/{repository}", RepositoryView) + self.router.add_view("/repository/{username}/{repository}/{path:.*}", RepositoryView) self.router.add_view("/settings/repositories/index.html", RepositoriesIndexView) self.router.add_view("/settings/repositories/create.html", RepositoriesCreateView) self.router.add_view("/settings/repositories/repository/{name}/update.html", RepositoriesUpdateView) diff --git a/src/snek/view/repository.py b/src/snek/view/repository.py index f7a2e9d..0c19142 100644 --- a/src/snek/view/repository.py +++ b/src/snek/view/repository.py @@ -1,15 +1,265 @@ -from snek.system.view import BaseView +import os +import mimetypes +import urllib.parse +from pathlib import Path +import humanize from aiohttp import web +from snek.system.view import BaseView +import asyncio +from git import Repo + + + + +class BareRepoNavigator: + def __init__(self, repo_path): + """Initialize the navigator with a bare repository path.""" + try: + self.repo = Repo(repo_path) + if not self.repo.bare: + print(f"Error: {repo_path} is not a bare repository.") + sys.exit(1) + except git.exc.InvalidGitRepositoryError: + print(f"Error: {repo_path} is not a valid Git repository.") + sys.exit(1) + except Exception as e: + print(f"Error opening repository: {str(e)}") + sys.exit(1) + + self.repo_path = repo_path + self.branches = list(self.repo.branches) + self.current_branch = None + self.current_commit = None + self.current_path = "" + self.history = [] + + def get_branches(self): + """Return a list of branch names in the repository.""" + return [branch.name for branch in self.branches] + + def set_branch(self, branch_name): + """Set the current branch.""" + try: + self.current_branch = self.repo.branches[branch_name] + self.current_commit = self.current_branch.commit + self.current_path = "" + self.history = [] + return True + except IndexError: + return False + + def get_commits(self, count=10): + """Get the latest commits on the current branch.""" + if not self.current_branch: + return [] + + commits = [] + for commit in self.repo.iter_commits(self.current_branch, max_count=count): + commits.append({ + 'hash': commit.hexsha, + 'short_hash': commit.hexsha[:7], + 'message': commit.message.strip(), + 'author': commit.author.name, + 'date': datetime.fromtimestamp(commit.committed_date).strftime('%Y-%m-%d %H:%M:%S') + }) + return commits + + def set_commit(self, commit_hash): + """Set the current commit by hash.""" + try: + self.current_commit = self.repo.commit(commit_hash) + self.current_path = "" + self.history = [] + return True + except ValueError: + return False + + def list_directory(self, path=""): + """List the contents of a directory in the current commit.""" + if not self.current_commit: + return {'dirs': [], 'files': []} + + dirs = [] + files = [] + + try: + # Get the tree at the current path + if path: + tree = self.current_commit.tree[path] + if not hasattr(tree, 'trees'): # It's a blob, not a tree + return {'dirs': [], 'files': [path]} + else: + tree = self.current_commit.tree + + # List directories and files + for item in tree: + if item.type == 'tree': + item_path = os.path.join(path, item.name) if path else item.name + dirs.append(item_path) + elif item.type == 'blob': + item_path = os.path.join(path, item.name) if path else item.name + files.append(item_path) + + dirs.sort() + files.sort() + return {'dirs': dirs, 'files': files} + + except KeyError: + return {'dirs': [], 'files': []} + + def get_file_content(self, file_path): + """Get the content of a file in the current commit.""" + if not self.current_commit: + return None + + try: + blob = self.current_commit.tree[file_path] + return blob.data_stream.read().decode('utf-8', errors='replace') + except (KeyError, UnicodeDecodeError): + try: + # Try to get as binary if text decoding fails + blob = self.current_commit.tree[file_path] + return blob.data_stream.read() + except: + return None + + def navigate_to(self, path): + """Navigate to a specific path, updating the current path.""" + if not self.current_commit: + return False + + try: + if path: + self.current_commit.tree[path] # Check if path exists + self.history.append(self.current_path) + self.current_path = path + return True + except KeyError: + return False + + def navigate_back(self): + """Navigate back to the previous path.""" + if self.history: + self.current_path = self.history.pop() + return True + return False + + + class RepositoryView(BaseView): - async def get(A): - G='type';H='name';I='.git';J='username';B=A.request.match_info[J];K=A.request.match_info['repo_name'];C=A.request.match_info.get('rel_path','') - if not B.count('-')==4:E=await A.services.user.get_by_username(B) - else:E=await A.services.user.get(B) - if not E:return web.HTTPNotFound() - B=E[J];M=await A.services.user.get_repository_path(E['uid']) - if C.endswith(I):C=C[:-4] - L=M.joinpath(K+I) - if not L.exists():return web.HTTPNotFound() - import os;from git import Repo;N=Repo(L.joinpath(C));F=[];O=[];P=N.head.commit - for D in P.tree.traverse():F.append({H:D.name,'mode':D.mode,G:D.type,'path':D.path,'size':D.size}) - sorted(F,key=lambda x:x[H]);sorted(F,key=lambda x:x[G],reverse=True);Q=f"{B}/{C}"[:-4];return await A.render_template('repository.html',dict(username=B,repo_name=K,rel_path=C,full_path=Q,files=F,directories=O)) \ No newline at end of file + + login_required = True + + def checkout_bare_repo(self, bare_repo_path: Path, target_path: Path, ref: str = 'HEAD'): + repo = Repo(bare_repo_path) + assert repo.bare, "Repository is not bare." + + commit = repo.commit(ref) + tree = commit.tree + + for blob in tree.traverse(): + target_file = target_path / blob.path + + target_file.parent.mkdir(parents=True, exist_ok=True) + print(blob.path) + + with open(target_file, 'wb') as f: + f.write(blob.data_stream.read()) + + + async def get(self): + + base_repo_path = Path("drive/repositories") + + authenticated_user_id = self.session.get("uid") + + username = self.request.match_info.get('username') + repo_name = self.request.match_info.get('repository') + rel_path = self.request.match_info.get('path', '') + user = None + if not username.count("-") == 4: + user = await self.app.services.user.get(username=username) + if not user: + return web.Response(text="404 Not Found", status=404) + username = user["username"] + else: + user = await self.app.services.user.get(uid=username) + + repo = await self.app.services.repository.get(name=repo_name, user_uid=user["uid"]) + if not repo: + return web.Response(text="404 Not Found", status=404) + if repo['is_private'] and authenticated_user_id != repo['uid']: + return web.Response(text="404 Not Found", status=404) + + repo_root_base = (base_repo_path / user['uid'] / (repo_name + ".git")).resolve() + repo_root = (base_repo_path / user['uid'] / repo_name).resolve() + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, + self.checkout_bare_repo, repo_root_base, repo_root + ) + except: + pass + + if not repo_root.exists() or not repo_root.is_dir(): + return web.Response(text="404 Not Found", status=404) + + safe_rel_path = os.path.normpath(rel_path).lstrip(os.sep) + abs_path = (repo_root / safe_rel_path).resolve() + + if not abs_path.exists() or not abs_path.is_relative_to(repo_root): + return web.Response(text="404 Not Found", status=404) + + if abs_path.is_dir(): + return web.Response(text=self.render_directory(abs_path, username, repo_name, safe_rel_path), content_type='text/html') + else: + return web.Response(text=self.render_file(abs_path), content_type='text/html') + + def render_directory(self, abs_path, username, repo_name, safe_rel_path): + entries = sorted(abs_path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower())) + items = [] + + if safe_rel_path: + parent_path = Path(safe_rel_path).parent + parent_link = f"/repository/{username}/{repo_name}/{parent_path}".rstrip('/') + items.append(f'
  • ⬅️ ..
  • ') + + for entry in entries: + link_path = urllib.parse.quote(str(Path(safe_rel_path) / entry.name)) + link = f"/repository/{username}/{repo_name}/{link_path}".rstrip('/') + display = entry.name + ('/' if entry.is_dir() else '') + size = '' if entry.is_dir() else humanize.naturalsize(entry.stat().st_size) + icon = self.get_icon(entry) + items.append(f'
  • {icon} {display} {size}
  • ') + + html = f""" + + 📁 {repo_name}/{safe_rel_path} + +

    📁 {username}/{repo_name}/{safe_rel_path}

    + + + + """ + return html + + def render_file(self, abs_path): + try: + with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + return f"
    {content}
    " + except Exception as e: + return f"

    Error

    {e}
    " + + def get_icon(self, file): + if file.is_dir(): return "📁" + mime = mimetypes.guess_type(file.name)[0] or '' + if mime.startswith("image"): return "🖼️" + if mime.startswith("text"): return "📄" + if mime.startswith("audio"): return "🎵" + if mime.startswith("video"): return "🎬" + if file.name.endswith(".py"): return "🐍" + return "📦" +