diff --git a/src/snek/app.py b/src/snek/app.py index 765e4fb..37218a6 100644 --- a/src/snek/app.py +++ b/src/snek/app.py @@ -51,6 +51,7 @@ from snek.view.register import RegisterView from snek.view.repository import RepositoryView from snek.view.rpc import RPCView from snek.view.search_user import SearchUserView +from snek.view.container import ContainerView from snek.view.settings.containers import ( ContainersCreateView, ContainersDeleteView, @@ -250,6 +251,7 @@ class Application(BaseApplication): show_index=True, ) self.router.add_view("/profiler.html", profiler_handler) + self.router.add_view("/container/sock/{channel_uid}.json", ContainerView) self.router.add_view("/about.html", AboutHTMLView) self.router.add_view("/about.md", AboutMDView) self.router.add_view("/logout.json", LogoutView) diff --git a/src/snek/service/container.py b/src/snek/service/container.py index da3355d..2c989c8 100644 --- a/src/snek/service/container.py +++ b/src/snek/service/container.py @@ -12,25 +12,26 @@ class ContainerService(BaseService): self.compose = ComposeFileManager(self.compose_path,self.container_event_handler) self.event_listeners = {} - async def add_event_listener(self, name, event): + async def add_event_listener(self, name, event,event_handler): if not name in self.event_listeners: self.event_listeners[name] = {} if not event in self.event_listeners[name]: self.event_listeners[name][event] = [] - queue = asyncio.Queue() - self.event_listeners[name][event].append(queue) - return queue.get + self.event_listeners[name][event].append(event_handler) async def container_event_handler(self, name, event, data): event_listeners = self.event_listeners.get(name, {}) - queues = event_listeners.get(event, []) - for queue in queues: - await queue.put(data) + handlers = event_listeners.get(event, []) + for handler in handlers: + if not await handler(data): + handlers.remove(handler) async def get_instances(self): return list(self.compose.list_instances()) async def get_container_name(self, channel_uid): + if channel_uid.startswith("channel-"): + return channel_uid return f"channel-{channel_uid}" async def get(self,channel_uid): @@ -45,6 +46,9 @@ class ContainerService(BaseService): async def get_status(self, channel_uid): return await self.compose.get_instance_status(await self.get_container_name(channel_uid)) + async def write_stdin(self, channel_uid, data): + return await self.compose.write_stdin(await self.get_container_name(channel_uid), data) + async def create( self, channel_uid, diff --git a/src/snek/static/base.css b/src/snek/static/base.css index c9a2de9..fa74fd3 100644 --- a/src/snek/static/base.css +++ b/src/snek/static/base.css @@ -9,6 +9,10 @@ html { height: 100%; } +.hidden { + display: none; +} + .gallery { padding: 50px; height: auto; diff --git a/src/snek/static/container.js b/src/snek/static/container.js new file mode 100644 index 0000000..7a2c2ab --- /dev/null +++ b/src/snek/static/container.js @@ -0,0 +1,94 @@ +import { app } from "./app.js"; +import { EventHandler } from "./event-handler.js"; + +export class Container extends EventHandler{ + status = "unknown" + cpus = 0 + memory = "0m" + image = "unknown:unknown" + name = null + channelUid = null + log = false + bytesSent = 0 + bytesReceived = 0 + _container = null + render(el){ + if(this._container == null){ + this._container = el + this.terminal.open(this._container) + + this.terminal.onData(data => this.ws.send(new TextEncoder().encode(data))); + } + this._fitAddon.fit(); + this.refresh() + + } + refresh(){ + this._fitAddon.fit(); + this.terminal.write("\x0C"); + } + toggle(){ + this._container.classList.toggle("hidden") + this.refresh() + } + + constructor(channelUid,log){ + super() + this.terminal = new Terminal({ cursorBlink: true }); + this._fitAddon = new FitAddon.FitAddon(); + this.terminal.loadAddon(this._fitAddon); + window.addEventListener("resize", () => this._fitAddon.fit()); + this.log = log ? true : false + this.channelUid = channelUid + this.update() + this.addEventListener("stdout", (data) => { + this.bytesReceived += data.length + if(this.log){ + console.log(`Container ${this.name}: ${data}`) + } + const fixedData = new Uint8Array(data); + this.terminal.write(new TextDecoder().decode(fixedData)); + + }) + this.ws = new WebSocket(`/container/sock/${channelUid}.json`) + this.ws.binaryType = "arraybuffer"; // Support binary data + this.ws.onmessage = (event) => { + this.emit("stdout", event.data) + } + this.ws.onopen = () => { + this.refresh() + } + window.container = this + + } + async start(){ + const result = await app.rpc.startContainer(this.channelUid) + await this.refresh() + return result && this.status == 'running' + } + async stop(){ + const result = await app.rpc.stopContainer(this.channelUid) + await this.refresh() + return result && this.status == 'stopped' + } + async write(data){ + await this.ws.send(data) + this.bytesSent += data.length + return true + } + async update(){ + + const container = await app.rpc.getContainer(this.channelUid) + this.status = container["status"] + this.cpus = container["cpus"] + this.memory = container["memory"] + this.image = container["image"] + this.name = container["name"] + } + +} +/* +window.getContainer = function(){ + return new Container(app.channelUid) +}*/ + diff --git a/src/snek/system/docker.py b/src/snek/system/docker.py index b1b3b5f..db218df 100644 --- a/src/snek/system/docker.py +++ b/src/snek/system/docker.py @@ -1,5 +1,5 @@ import copy - +import json import yaml import asyncio import subprocess @@ -25,6 +25,25 @@ class ComposeFileManager: def list_instances(self): return list(self.compose.get("services", {}).keys()) + async def _create_readers(self, container_name): + instance = await self.get_instance(container_name) + if not instance: + return False + proc = self.running_instances.get(container_name) + if not proc: + return False + async def reader(event_handler,stream): + while True: + line = await stream.readline() + print("XXX",line) + if not line: + break + await event_handler(container_name,"stdout",line) + await self.stop(container_name) + asyncio.create_task(reader(self.event_handler,proc.stdout)) + asyncio.create_task(reader(self.event_handler,proc.stderr)) + + def create_instance( self, name, @@ -38,8 +57,7 @@ class ComposeFileManager: service = { "image": image, } - if command: - service["command"] = command + service["command"] = command or "tail -f /dev/null" if cpus or memory: service["deploy"] = {"resources": {"limits": {}}} if cpus: @@ -68,7 +86,7 @@ class ComposeFileManager: instance['status'] = await self.get_instance_status(name) print("INSTANCE",instance) - return instance + return json.loads(json.dumps(instance,default=str)) def duplicate_instance(self, name, new_name): orig = self.get_instance(name) @@ -100,6 +118,21 @@ class ComposeFileManager: running_services = stdout.decode().split() return "running" if name in running_services else "stopped" + async def write_stdin(self, name, data): + await self.event_handler(name, "stdin", data) + proc = self.running_instances.get(name) + print("Found proc:",proc) + print(name,data) + if not proc: + return False + try: + proc.stdin.write(data.encode()) + return True + except Exception as ex: + print(ex) + await self.stop(name) + return False + async def stop(self, name): """Asynchronously stop a container by doing 'docker compose stop [name]'.""" if name not in self.list_instances(): @@ -118,7 +151,12 @@ class ComposeFileManager: stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"Failed to stop {name}: {stderr.decode()}") - return stdout.decode() + if stdout: + await self.event_handler(name,"stdout",stdout) + return stdout.decode(errors="ignore") + + await self.event_handler(name,"stdout",stderr) + return stderr.decode(errors="ignore") async def start(self, name): """Asynchronously start a container by doing 'docker compose up -d [name]'.""" @@ -126,20 +164,41 @@ class ComposeFileManager: return False status = await self.get_instance_status(name) - if name in self.running_instances and status == "running": + if name in self.running_instances and status == "running" and self.running_instances.get(name): + await self.stop(name) return True - else: + elif name in self.running_instances: del self.running_instances[name] proc = await asyncio.create_subprocess_exec( - "docker", "compose", "-f", self.compose_path, "up", "-d", name, + "docker", "compose", "-f", self.compose_path, "up", name, "-d", + stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) - stdout, stderr = await proc.communicate() + stdout,stderr = await proc.communicate() + print(stdout, stderr) if proc.returncode != 0: - raise RuntimeError(f"Failed to start {name}: {stderr.decode()}") - return stdout.decode() + print(f"Failed to start {name}: {stderr.decode(errors='ignore')}") + return False + + proc = await asyncio.create_subprocess_exec( + "docker", "compose", "-f", self.compose_path, "exec", name, "/bin/bash", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) +# stdin,stderr = await proc.communicate() + self.running_instances[name] = proc + #if stdout: + # await self.event_handler(name, "stdout", stdout) + #if stderr: + # await self.event_handler(name,"stdout",stderr) + + await self._create_readers(name) + + return True + #return stdout and stdout.decode(errors="ignore") or stderr.decode(errors="ignore") # Example usage: # mgr = ComposeFileManager() diff --git a/src/snek/templates/app.html b/src/snek/templates/app.html index 628bf92..345d7e6 100644 --- a/src/snek/templates/app.html +++ b/src/snek/templates/app.html @@ -17,6 +17,11 @@ + + + + + @@ -50,7 +55,13 @@ {% endblock %} -