import asyncio import getpass import sys import logging import traceback from typing import Dict, Any import aiohttp from snekbot.rpc import RPC from snekbot.bot import Bot logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger: logging.Logger = logging.getLogger("snek_cli") def markdown_to_ansi(text: str) -> str: import re def heading_sub(match: re.Match) -> str: hashes = match.group(1) content = match.group(2).strip() level = len(hashes) if level == 1: return f"\033[1;4m{content}\033[0m" # Bold + Underline elif level == 2: return f"\033[1m{content}\033[0m" elif level == 3: return f"\033[1;36m{content}\033[0m" elif level == 4: return f"\033[36m{content}\033[0m" elif level == 5: return f"\033[2;36m{content}\033[0m" else: return content text = re.sub(r"^(#{1,5})\s+(.+)$", heading_sub, text, flags=re.MULTILINE) text = re.sub(r"^\s*---+\s*$", "\033[38;5;244m" + "─" * 40 + "\033[0m", text, flags=re.MULTILINE) def ordered_list_sub(match: re.Match) -> str: idx = match.group(1) content = match.group(2) return f"\033[38;5;33m{idx}.\033[0m {content}" text = re.sub(r"^(\d+)\.\s+(.*)$", ordered_list_sub, text, flags=re.MULTILINE) def unordered_list_sub(match: re.Match) -> str: bullet = match.group(1) content = match.group(2) return f"\033[38;5;33m•\033[0m {content}" text = re.sub(r"^(\s*[-*+])\s+(.*)$", unordered_list_sub, text, flags=re.MULTILINE) text = re.sub(r"`([^`]+)`", r"\033[38;5;244m\033[48;5;236m \1 \033[0m", text) text = re.sub(r"\*\*(.*?)\*\*", r"\033[1m\1\033[0m", text) text = re.sub(r"\*(.*?)\*", r"\033[3m\1\033[0m", text) return text class CliClient(Bot): active_channel_uid: Any = None _channel_cache: Dict[Any, Any] = {} def _display(self, message: str) -> None: ansi_message = markdown_to_ansi(message) sys.stdout.write('\r' + ' ' * 80 + '\r') sys.stdout.write(ansi_message + '\n') sys.stdout.write(f'[{self._get_active_channel_name()}]> ') sys.stdout.flush() def _get_active_channel_name(self) -> str: if self.active_channel_uid and self.active_channel_uid in self._channel_cache: return self._channel_cache[self.active_channel_uid]['name'] return "No Channel" async def send_message(self, channel_uid: Any, message: str) -> bool: if self.ws and not self.ws.closed: payload: Dict[str, Any] = { "method": "send_message", "args": [channel_uid, message, True], "kwargs": {}, "callId": None } await self.ws.send_json(payload) return True logger.error("Cannot send message, WebSocket is not connected.") return False async def _network_loop(self) -> None: try: async with aiohttp.ClientSession() as session: async with session.ws_connect(self.url) as ws: self.ws = ws self.rpc = RPC(self.ws) await self.rpc.login(self.username, self.password) self.user: Dict[str, Any] = await self.rpc.get_user(None) await self.on_init() while not self.ws.closed: data = await self.rpc.receive() if not data: break event: str = "?" try: event = data.event except AttributeError: pass try: message: str = data.message.strip() event = "message" except AttributeError: pass if event == "?": continue elif event == "message": if not data.is_final: continue if data.username == self.user["username"]: continue else: await self.on_message(data.username, data.user_nick, data.channel_uid, message) else: try: await getattr(self, "on_" + data.event)(**data.data) except AttributeError: logger.debug("Not implemented event: " + event) except Exception: logger.error("Network loop disconnected with an error.") traceback.print_exc() async def run(self) -> None: network_task = asyncio.create_task(self._network_loop()) while not self.user: if network_task.done(): network_task.result() return await asyncio.sleep(0.1) input_task = asyncio.create_task(self._input_loop()) done, pending = await asyncio.wait( [network_task, input_task], return_when=asyncio.FIRST_COMPLETED ) for task in pending: task.cancel() async def on_init(self) -> None: try: logger.info("Successfully logged in as %s.", self.user['username']) except KeyError: raise ValueError("\033[91mUsername or password incorrect!\033[0m") channels = await self.get_channels() for channel in channels: self._channel_cache[channel['uid']] = channel self._display(f"**Welcome, {self.user['nick']}!** Your available commands are:") self._display("> /join , /channels, /quit") await self.on_idle() async def on_message(self, username: str, user_nick: str, channel_uid: Any, message: str) -> None: sys.stdout.write('\a') sys.stdout.flush() channel_name = self._channel_cache.get(channel_uid, {}).get('name', 'unknown') self._display(f"**[{channel_name}]** *<{user_nick}>*: {message}") async def on_own_message(self, channel_uid: Any, message: str) -> None: channel_name = self._channel_cache.get(channel_uid, {}).get('name', 'unknown') full_message = f"**[{channel_name}]** *<{self.user['nick']}>*: {message}" self._display(full_message) async def on_idle(self) -> None: sys.stdout.write(f'[{self._get_active_channel_name()}]> ') sys.stdout.flush() async def _handle_command(self, line: str) -> bool: command, *args = line.strip().split() if command == "/join": if not args: self._display("> Usage: `/join `") return True channel_name_to_join = args[0] for uid, channel_data in self._channel_cache.items(): if channel_data['name'] == channel_name_to_join: self.active_channel_uid = uid self._display(f"*Active channel set to* **'{channel_name_to_join}'**.") return True self._display(f"*** Error: Channel '{channel_name_to_join}' not found.") elif command == "/channels": names = [ch['name'] for ch in self._channel_cache.values()] self._display(f"*Available channels*: `{'`, `'.join(names)}`") elif command == "/quit": self._display("*Disconnecting. Goodbye!*") return False else: self._display(f"*** Unknown command: {command}") return True async def _input_loop(self) -> None: loop = asyncio.get_event_loop() try: while True: line = await loop.run_in_executor(None, sys.stdin.readline) line = line.strip() if not line: continue if line.startswith("/"): if not await self._handle_command(line): break else: if not self.active_channel_uid: self._display("> No active channel. Use `/join ` to start talking.") continue await self.send_message(self.active_channel_uid, line) await self.on_own_message(self.active_channel_uid, line) finally: if self.ws and not self.ws.closed: await self.ws.close() if __name__ == "__main__": print("--- Snekbot CLI Client ---") url = "wss://snek.molodetz.nl/rpc.ws" username = input("Username: ") password = getpass.getpass("Password: ") client = CliClient(username, password, url=url) try: asyncio.run(client.run()) except KeyboardInterrupt: print("\nClient shut down by user.") except Exception as e: logger.error(f"An unexpected error occurred: {e}")