#!/usr/bin/env python3
import asyncio
import shutil
import uuid
import os
import pty
import struct
import fcntl
import termios
import signal
import json
from pathlib import Path
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse
import uvicorn
app = FastAPI()
active_sessions = {}
@app.get("/")
async def root():
return HTMLResponse("""
Preparing Environment
Preparing Environment
Please wait while we set up your workspace...
""")
@app.get("/prepare")
async def prepare():
session_id = str(uuid.uuid4())
system_directory = Path("system")
workspace_directory = Path(f"workspace/{session_id}")
if not system_directory.exists():
raise HTTPException(status_code=500, detail="System directory not found")
shutil.copytree(system_directory, workspace_directory)
active_sessions[session_id] = {"workspace": workspace_directory}
return {"session_id": session_id}
@app.get("/terminal/{session_id}")
async def terminal(session_id: str):
if session_id not in active_sessions:
return RedirectResponse(url="/")
return HTMLResponse(f"""
Terminal
""")
def set_winsize(fd, rows, cols):
winsize = struct.pack("HHHH", rows, cols, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
if session_id not in active_sessions:
await websocket.close(code=1008)
return
await websocket.accept()
workspace_directory = active_sessions[session_id]["workspace"]
master_fd, slave_fd = pty.openpty()
event_loop = asyncio.get_event_loop()
process = await asyncio.create_subprocess_exec(
"./manage.py", "start",
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=str(workspace_directory),
start_new_session=True
)
os.close(slave_fd)
async def read_output():
try:
while True:
data = await event_loop.run_in_executor(None, os.read, master_fd, 1024)
if not data:
break
await websocket.send_text(data.decode('utf-8', errors='replace'))
except Exception:
await websocket.close()
async def handle_input():
try:
while True:
message = await websocket.receive_text()
parsed_message = json.loads(message)
if parsed_message['type'] == 'input':
await event_loop.run_in_executor(None, os.write, master_fd, parsed_message['data'].encode('utf-8'))
elif parsed_message['type'] == 'resize':
set_winsize(master_fd, parsed_message['data']['rows'], parsed_message['data']['cols'])
except Exception:
await websocket.close()
try:
await asyncio.gather(read_output(), handle_input(), return_exceptions=True)
except Exception as e:
print(e)
await websocket.close()
try:
os.close(master_fd)
except Exception:
pass
try:
process.kill()
except Exception:
pass
await websocket.close()
@app.get("/files/{session_id}/{path:path}")
async def browse_files(session_id: str, path: str = ""):
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
workspace_directory = active_sessions[session_id]["workspace"]
target_path = (workspace_directory / path).resolve()
if not str(target_path).startswith(str(workspace_directory.resolve())):
raise HTTPException(status_code=403, detail="Access denied")
if not target_path.exists():
raise HTTPException(status_code=404, detail="Path not found")
if target_path.is_file():
return FileResponse(target_path)
items = []
for item in sorted(target_path.iterdir()):
rel_path = item.absolute().relative_to(workspace_directory.absolute())
items.append({
"name": item.name,
"is_dir": item.is_dir(),
"path": str(rel_path)
})
html = f"""
File Browser
File Browser
Path: /{path or ''}
Name
Type
"""
if path:
parent = str(Path(path).parent) if Path(path).parent != Path('.') else ''
html += f'.. Directory '
for item in items:
name_display = item['name'] + ('/' if item['is_dir'] else '')
item_type = 'Directory' if item['is_dir'] else 'File'
css_class = 'dir' if item['is_dir'] else 'file'
html += f'{name_display} {item_type} '
html += """
"""
return HTMLResponse(html)
if __name__ == "__main__":
signal.signal(signal.SIGINT, lambda s, f: os._exit(0))
import argparse
parser = argparse.ArgumentParser(description="Run the application server.")
parser.add_argument("--port", type=int, default=8240, help="Port number to run the server on.")
arguments = parser.parse_args()
uvicorn.run(app, host="0.0.0.0", port=arguments.port)