import asyncio import os import asyncssh # SSH Server Configuration HOST = "0.0.0.0" PORT = 2225 USERNAME = "user" PASSWORD = "password" SHELL = "/bin/sh" # Change to another shell if needed class CustomSSHServer(asyncssh.SSHServer): def connection_made(self, conn): print(f"New connection from {conn.get_extra_info('peername')}") def connection_lost(self, exc): print("Client disconnected") def password_auth_supported(self): return True def validate_password(self, username, password): return username == USERNAME and password == PASSWORD async def custom_bash_process(process): """Spawns a custom bash shell process""" env = os.environ.copy() env["TERM"] = "xterm-256color" # Start the Bash shell bash_proc = await asyncio.create_subprocess_exec( SHELL, "-i", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, ) async def read_output(): while True: data = await bash_proc.stdout.read(1) if not data: break process.stdout.write(data) async def read_input(): while True: data = await process.stdin.read(1) if not data: break bash_proc.stdin.write(data) await asyncio.gather(read_output(), read_input()) async def start_ssh_server(): """Starts the AsyncSSH server with Bash""" await asyncssh.create_server( lambda: CustomSSHServer(), host=HOST, port=PORT, server_host_keys=["ssh_host_key"], process_factory=custom_bash_process, ) print(f"SSH server running on {HOST}:{PORT}") await asyncio.Future() # Keep running if __name__ == "__main__": try: asyncio.run(start_ssh_server()) except (OSError, asyncssh.Error) as e: print(f"Error starting SSH server: {e}")