Update.
This commit is contained in:
parent
7b08e6a45e
commit
f35742fec3
src/snek
@ -9,7 +9,23 @@ class ContainerService(BaseService):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.compose_path = "snek-container-compose.yml"
|
||||
self.compose = ComposeFileManager(self.compose_path)
|
||||
self.compose = ComposeFileManager(self.compose_path,self.container_event_handler)
|
||||
self.event_listeners = {}
|
||||
|
||||
async def add_event_listener(self, name, event):
|
||||
if not name in self.event_listeners:
|
||||
self.event_listeners[name] = {}
|
||||
if not event in self.event_listeners[name]:
|
||||
self.event_listeners[name][event] = []
|
||||
queue = asyncio.Queue()
|
||||
self.event_listeners[name][event].append(queue)
|
||||
return queue.get
|
||||
|
||||
async def container_event_handler(self, name, event, data):
|
||||
event_listeners = self.event_listeners.get(name, {})
|
||||
queues = event_listeners.get(event, [])
|
||||
for queue in queues:
|
||||
await queue.put(data)
|
||||
|
||||
async def get_instances(self):
|
||||
return list(self.compose.list_instances())
|
||||
@ -20,6 +36,12 @@ class ContainerService(BaseService):
|
||||
async def get(self,channel_uid):
|
||||
return await self.compose.get_instance(await self.get_container_name(channel_uid))
|
||||
|
||||
async def stop(self, channel_uid):
|
||||
return await self.compose.stop(await self.get_container_name(channel_uid))
|
||||
|
||||
async def start(self, channel_uid):
|
||||
return await self.compose.start(await self.get_container_name(channel_uid))
|
||||
|
||||
async def get_status(self, channel_uid):
|
||||
return await self.compose.get_instance_status(await self.get_container_name(channel_uid))
|
||||
|
||||
|
@ -1,16 +1,15 @@
|
||||
import copy
|
||||
|
||||
import yaml
|
||||
|
||||
import yaml
|
||||
import copy
|
||||
import asyncio
|
||||
import subprocess
|
||||
|
||||
class ComposeFileManager:
|
||||
def __init__(self, compose_path="docker-compose.yml"):
|
||||
def __init__(self, compose_path="docker-compose.yml",event_handler=None):
|
||||
self.compose_path = compose_path
|
||||
self._load()
|
||||
self.running_instances = {}
|
||||
self.event_handler = event_handler
|
||||
|
||||
def _load(self):
|
||||
try:
|
||||
@ -100,8 +99,47 @@ class ComposeFileManager:
|
||||
stdout, _ = await proc.communicate()
|
||||
running_services = stdout.decode().split()
|
||||
return "running" if name in running_services else "stopped"
|
||||
# Storage size is not tracked in compose files; would need Docker API for that.
|
||||
|
||||
async def stop(self, name):
|
||||
"""Asynchronously stop a container by doing 'docker compose stop [name]'."""
|
||||
if name not in self.list_instances():
|
||||
return False
|
||||
status = await self.get_instance_status(name)
|
||||
if status != "running":
|
||||
return True
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"docker", "compose", "-f", self.compose_path, "stop", name,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
if name in self.running_instances:
|
||||
del self.running_instances[name]
|
||||
stdout, stderr = await proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"Failed to stop {name}: {stderr.decode()}")
|
||||
return stdout.decode()
|
||||
|
||||
async def start(self, name):
|
||||
"""Asynchronously start a container by doing 'docker compose up -d [name]'."""
|
||||
if name not in self.list_instances():
|
||||
return False
|
||||
|
||||
status = await self.get_instance_status(name)
|
||||
if name in self.running_instances and status == "running":
|
||||
return True
|
||||
else:
|
||||
del self.running_instances[name]
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"docker", "compose", "-f", self.compose_path, "up", "-d", name,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(f"Failed to start {name}: {stderr.decode()}")
|
||||
return stdout.decode()
|
||||
|
||||
# Example usage:
|
||||
# mgr = ComposeFileManager()
|
||||
@ -109,3 +147,5 @@ class ComposeFileManager:
|
||||
# print(mgr.list_instances())
|
||||
# mgr.duplicate_instance('web', 'web_copy')
|
||||
# mgr.remove_instance('web_copy')
|
||||
# await mgr.start('web')
|
||||
|
||||
|
@ -223,6 +223,24 @@ class RPCView(BaseView):
|
||||
}
|
||||
return result
|
||||
|
||||
async def start_container(self, channel_uid):
|
||||
self._require_login()
|
||||
channel_member = await self.services.channel_member.get(
|
||||
channel_uid=channel_uid, user_uid=self.user_uid
|
||||
)
|
||||
if not channel_member:
|
||||
raise Exception("Not allowed")
|
||||
return await self.services.container.start(channel_uid)
|
||||
|
||||
async def stop_container(self, channel_uid):
|
||||
self._require_login()
|
||||
channel_member = await self.services.channel_member.get(
|
||||
channel_uid=channel_uid, user_uid=self.user_uid
|
||||
)
|
||||
if not channel_member:
|
||||
raise Exception("Not allowed")
|
||||
return await self.services.container.stop(channel_uid)
|
||||
|
||||
async def get_container_status(self, channel_uid):
|
||||
self._require_login()
|
||||
channel_member = await self.services.channel_member.get(
|
||||
|
Loading…
Reference in New Issue
Block a user