#!/usr/bin/env python3 import asyncio import shutil import uuid import os import pty import struct import fcntl import termios import signal import json from pathlib import Path from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse import uvicorn app = FastAPI() active_sessions = {} @app.get("/") async def root(): return HTMLResponse(""" Preparing Environment

Preparing Environment

Please wait while we set up your workspace...

""") @app.get("/prepare") async def prepare(): session_id = str(uuid.uuid4()) system_directory = Path("system") workspace_directory = Path(f"workspace/{session_id}") if not system_directory.exists(): raise HTTPException(status_code=500, detail="System directory not found") shutil.copytree(system_directory, workspace_directory) active_sessions[session_id] = {"workspace": workspace_directory} return {"session_id": session_id} @app.get("/terminal/{session_id}") async def terminal(session_id: str): if session_id not in active_sessions: return RedirectResponse(url="/") return HTMLResponse(f""" Terminal
""") def set_winsize(fd, rows, cols): winsize = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) @app.websocket("/ws/{session_id}") async def websocket_endpoint(websocket: WebSocket, session_id: str): if session_id not in active_sessions: await websocket.close(code=1008) return await websocket.accept() workspace_directory = active_sessions[session_id]["workspace"] master_fd, slave_fd = pty.openpty() event_loop = asyncio.get_event_loop() process = await asyncio.create_subprocess_exec( "./manage.py", "start", stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, cwd=str(workspace_directory), start_new_session=True ) os.close(slave_fd) async def read_output(): try: while True: data = await event_loop.run_in_executor(None, os.read, master_fd, 1024) if not data: break await websocket.send_text(data.decode('utf-8', errors='replace')) except Exception: await websocket.close() async def handle_input(): try: while True: message = await websocket.receive_text() parsed_message = json.loads(message) if parsed_message['type'] == 'input': await event_loop.run_in_executor(None, os.write, master_fd, parsed_message['data'].encode('utf-8')) elif parsed_message['type'] == 'resize': set_winsize(master_fd, parsed_message['data']['rows'], parsed_message['data']['cols']) except Exception: await websocket.close() try: await asyncio.gather(read_output(), handle_input(), return_exceptions=True) except Exception as e: print(e) await websocket.close() try: os.close(master_fd) except Exception: pass try: process.kill() except Exception: pass await websocket.close() @app.get("/files/{session_id}/{path:path}") async def browse_files(session_id: str, path: str = ""): if session_id not in active_sessions: raise HTTPException(status_code=404, detail="Session not found") workspace_directory = active_sessions[session_id]["workspace"] target_path = (workspace_directory / path).resolve() if not str(target_path).startswith(str(workspace_directory.resolve())): raise HTTPException(status_code=403, detail="Access denied") if not target_path.exists(): raise HTTPException(status_code=404, detail="Path not found") if target_path.is_file(): return FileResponse(target_path) items = [] for item in sorted(target_path.iterdir()): rel_path = item.absolute().relative_to(workspace_directory.absolute()) items.append({ "name": item.name, "is_dir": item.is_dir(), "path": str(rel_path) }) html = f""" File Browser

File Browser

Path: /{path or ''}
""" if path: parent = str(Path(path).parent) if Path(path).parent != Path('.') else '' html += f'' for item in items: name_display = item['name'] + ('/' if item['is_dir'] else '') item_type = 'Directory' if item['is_dir'] else 'File' css_class = 'dir' if item['is_dir'] else 'file' html += f'' html += """
Name Type
..Directory
{name_display}{item_type}
""" return HTMLResponse(html) if __name__ == "__main__": signal.signal(signal.SIGINT, lambda s, f: os._exit(0)) import argparse parser = argparse.ArgumentParser(description="Run the application server.") parser.add_argument("--port", type=int, default=8240, help="Port number to run the server on.") arguments = parser.parse_args() uvicorn.run(app, host="0.0.0.0", port=arguments.port)