From 6fff57dd81adf5cc6af5bd5d3811de030ec0e137 Mon Sep 17 00:00:00 2001 From: retoor Date: Thu, 24 Jul 2025 21:21:37 +0200 Subject: [PATCH] Initial commit. --- main.py | 233 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..317ce48 --- /dev/null +++ b/main.py @@ -0,0 +1,233 @@ +import asyncio +import getpass +import sys +import logging +import traceback +from typing import Dict, Any + +import aiohttp + +from snekbot.rpc import RPC +from snekbot.bot import Bot + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger: logging.Logger = logging.getLogger("snek_cli") + + +def markdown_to_ansi(text: str) -> str: + import re + + def heading_sub(match: re.Match) -> str: + hashes = match.group(1) + content = match.group(2).strip() + level = len(hashes) + if level == 1: + return f"\033[1;4m{content}\033[0m" # Bold + Underline + elif level == 2: + return f"\033[1m{content}\033[0m" + elif level == 3: + return f"\033[1;36m{content}\033[0m" + elif level == 4: + return f"\033[36m{content}\033[0m" + elif level == 5: + return f"\033[2;36m{content}\033[0m" + else: + return content + + text = re.sub(r"^(#{1,5})\s+(.+)$", heading_sub, text, flags=re.MULTILINE) + + text = re.sub(r"^\s*---+\s*$", "\033[38;5;244m" + "─" * 40 + "\033[0m", text, flags=re.MULTILINE) + + def ordered_list_sub(match: re.Match) -> str: + idx = match.group(1) + content = match.group(2) + return f"\033[38;5;33m{idx}.\033[0m {content}" + + text = re.sub(r"^(\d+)\.\s+(.*)$", ordered_list_sub, text, flags=re.MULTILINE) + + def unordered_list_sub(match: re.Match) -> str: + bullet = match.group(1) + content = match.group(2) + return f"\033[38;5;33m•\033[0m {content}" + + text = re.sub(r"^(\s*[-*+])\s+(.*)$", unordered_list_sub, text, flags=re.MULTILINE) + + text = re.sub(r"`([^`]+)`", r"\033[38;5;244m\033[48;5;236m \1 \033[0m", text) + + text = re.sub(r"\*\*(.*?)\*\*", r"\033[1m\1\033[0m", text) + + text = re.sub(r"\*(.*?)\*", r"\033[3m\1\033[0m", text) + + return text + + +class CliClient(Bot): + active_channel_uid: Any = None + _channel_cache: Dict[Any, Any] = {} + + def _display(self, message: str) -> None: + ansi_message = markdown_to_ansi(message) + sys.stdout.write('\r' + ' ' * 80 + '\r') + sys.stdout.write(ansi_message + '\n') + sys.stdout.write(f'[{self._get_active_channel_name()}]> ') + sys.stdout.flush() + + def _get_active_channel_name(self) -> str: + if self.active_channel_uid and self.active_channel_uid in self._channel_cache: + return self._channel_cache[self.active_channel_uid]['name'] + return "No Channel" + + async def send_message(self, channel_uid: Any, message: str) -> bool: + if self.ws and not self.ws.closed: + payload: Dict[str, Any] = { + "method": "send_message", + "args": [channel_uid, message, True], + "kwargs": {}, + "callId": None + } + await self.ws.send_json(payload) + return True + logger.error("Cannot send message, WebSocket is not connected.") + return False + + async def _network_loop(self) -> None: + try: + async with aiohttp.ClientSession() as session: + async with session.ws_connect(self.url) as ws: + self.ws = ws + self.rpc = RPC(self.ws) + await self.rpc.login(self.username, self.password) + self.user: Dict[str, Any] = await self.rpc.get_user(None) + await self.on_init() + while not self.ws.closed: + data = await self.rpc.receive() + if not data: + break + event: str = "?" + try: + event = data.event + except AttributeError: + pass + try: + message: str = data.message.strip() + event = "message" + except AttributeError: + pass + if event == "?": + continue + elif event == "message": + if not data.is_final: + continue + if data.username == self.user["username"]: + continue + else: + await self.on_message(data.username, data.user_nick, data.channel_uid, message) + else: + try: + await getattr(self, "on_" + data.event)(**data.data) + except AttributeError: + logger.debug("Not implemented event: " + event) + except Exception: + logger.error("Network loop disconnected with an error.") + traceback.print_exc() + + async def run(self) -> None: + network_task = asyncio.create_task(self._network_loop()) + while not self.user: + if network_task.done(): + network_task.result() + return + await asyncio.sleep(0.1) + input_task = asyncio.create_task(self._input_loop()) + done, pending = await asyncio.wait( + [network_task, input_task], + return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + + async def on_init(self) -> None: + try: + logger.info("Successfully logged in as %s.", self.user['username']) + except KeyError: + raise ValueError("\033[91mUsername or password incorrect!\033[0m") + + channels = await self.get_channels() + for channel in channels: + self._channel_cache[channel['uid']] = channel + self._display(f"**Welcome, {self.user['nick']}!** Your available commands are:") + self._display("> /join , /channels, /quit") + await self.on_idle() + + async def on_message(self, username: str, user_nick: str, channel_uid: Any, message: str) -> None: + sys.stdout.write('\a') + sys.stdout.flush() + channel_name = self._channel_cache.get(channel_uid, {}).get('name', 'unknown') + self._display(f"**[{channel_name}]** *<{user_nick}>*: {message}") + + async def on_own_message(self, channel_uid: Any, message: str) -> None: + channel_name = self._channel_cache.get(channel_uid, {}).get('name', 'unknown') + full_message = f"**[{channel_name}]** *<{self.user['nick']}>*: {message}" + self._display(full_message) + + async def on_idle(self) -> None: + sys.stdout.write(f'[{self._get_active_channel_name()}]> ') + sys.stdout.flush() + + async def _handle_command(self, line: str) -> bool: + command, *args = line.strip().split() + if command == "/join": + if not args: + self._display("> Usage: `/join `") + return True + channel_name_to_join = args[0] + for uid, channel_data in self._channel_cache.items(): + if channel_data['name'] == channel_name_to_join: + self.active_channel_uid = uid + self._display(f"*Active channel set to* **'{channel_name_to_join}'**.") + return True + self._display(f"*** Error: Channel '{channel_name_to_join}' not found.") + elif command == "/channels": + names = [ch['name'] for ch in self._channel_cache.values()] + self._display(f"*Available channels*: `{'`, `'.join(names)}`") + elif command == "/quit": + self._display("*Disconnecting. Goodbye!*") + return False + else: + self._display(f"*** Unknown command: {command}") + return True + + async def _input_loop(self) -> None: + loop = asyncio.get_event_loop() + try: + while True: + line = await loop.run_in_executor(None, sys.stdin.readline) + line = line.strip() + if not line: + continue + if line.startswith("/"): + if not await self._handle_command(line): + break + else: + if not self.active_channel_uid: + self._display("> No active channel. Use `/join ` to start talking.") + continue + await self.send_message(self.active_channel_uid, line) + await self.on_own_message(self.active_channel_uid, line) + finally: + if self.ws and not self.ws.closed: + await self.ws.close() + + +if __name__ == "__main__": + print("--- Snekbot CLI Client ---") + url = "wss://snek.molodetz.nl/rpc.ws" + username = input("Username: ") + password = getpass.getpass("Password: ") + client = CliClient(username, password, url=url) + try: + asyncio.run(client.run()) + except KeyboardInterrupt: + print("\nClient shut down by user.") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}")