|
from snek.system.service import BaseService
|
|
import asyncio
|
|
import shutil
|
|
|
|
class RepositoryService(BaseService):
|
|
mapper_name = "repository"
|
|
|
|
async def delete(self, user_uid, name):
|
|
loop = asyncio.get_event_loop()
|
|
repository_path = (await self.services.user.get_repository_path(user_uid)).joinpath(name)
|
|
try:
|
|
await loop.run_in_executor(None, shutil.rmtree, repository_path)
|
|
except Exception as ex:
|
|
print(ex)
|
|
|
|
await super().delete(user_uid=user_uid, name=name)
|
|
|
|
|
|
async def exists(self, user_uid, name, **kwargs):
|
|
kwargs["user_uid"] = user_uid
|
|
kwargs["name"] = name
|
|
return await super().exists(**kwargs)
|
|
|
|
async def init(self, user_uid, name):
|
|
repository_path = await self.services.user.get_repository_path(user_uid)
|
|
if not repository_path.exists():
|
|
repository_path.mkdir(parents=True)
|
|
repository_path = repository_path.joinpath(name)
|
|
repository_path = str(repository_path)
|
|
if not repository_path.endswith(".git"):
|
|
repository_path += ".git"
|
|
command = ['git', 'init', '--bare', repository_path]
|
|
process = await asyncio.subprocess.create_subprocess_exec(
|
|
*command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
return process.returncode == 0
|
|
|
|
async def create(self, user_uid, name,is_private=False):
|
|
if await self.exists(user_uid=user_uid, name=name):
|
|
return False
|
|
|
|
if not await self.init(user_uid=user_uid, name=name):
|
|
return False
|
|
|
|
model = await self.new()
|
|
model["user_uid"] = user_uid
|
|
model["name"] = name
|
|
model["is_private"] = is_private
|
|
return await self.save(model)
|