#!/usr/bin/env python3 import atexit import curses import os import pickle import queue import re import signal import socket import sys import threading import time class RPEditor: def __init__(self, filename=None, auto_save=False, timeout=30): """ Initialize RPEditor with enhanced robustness features. Args: filename: File to edit auto_save: Enable auto-save on exit timeout: Command timeout in seconds """ self.filename = filename self.lines = [""] self.cursor_y = 0 self.cursor_x = 0 self.mode = "normal" self.command = "" self.stdscr = None self.running = False self.thread = None self.socket_thread = None self.prev_key = None self.clipboard = "" self.undo_stack = [] self.redo_stack = [] self.selection_start = None self.selection_end = None self.max_undo = 100 self.lock = threading.RLock() self.command_queue = queue.Queue() self.auto_save = auto_save self.timeout = timeout self._cleanup_registered = False self._original_terminal_state = None self._exception_occurred = False # Create socket pair with error handling try: self.client_sock, self.server_sock = socket.socketpair() self.client_sock.settimeout(self.timeout) self.server_sock.settimeout(self.timeout) except Exception as e: self._cleanup() raise RuntimeError(f"Failed to create socket pair: {e}") # Register cleanup handlers self._register_cleanup() if filename: self.load_file() def _register_cleanup(self): """Register cleanup handlers for proper shutdown.""" if not self._cleanup_registered: atexit.register(self._cleanup) signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) self._cleanup_registered = True def _signal_handler(self, signum, frame): """Handle signals for clean shutdown.""" self._cleanup() sys.exit(0) def _cleanup(self): """Comprehensive cleanup of all resources.""" try: # Stop the editor self.running = False # Save if auto-save is enabled if self.auto_save and self.filename and not self._exception_occurred: try: self._save_file() except: pass # Clean up curses if self.stdscr: try: self.stdscr.keypad(False) curses.nocbreak() curses.echo() curses.curs_set(1) except: pass finally: try: curses.endwin() except: pass # Clear screen after curses cleanup try: os.system("clear" if os.name != "nt" else "cls") except: pass # Close sockets for sock in [self.client_sock, self.server_sock]: if sock: try: sock.close() except: pass # Wait for threads to finish for thread in [self.thread, self.socket_thread]: if thread and thread.is_alive(): thread.join(timeout=1) except: pass def load_file(self): """Load file with enhanced error handling.""" try: if os.path.exists(self.filename): with open(self.filename, encoding="utf-8", errors="replace") as f: content = f.read() self.lines = content.splitlines() if content else [""] else: self.lines = [""] except Exception: self.lines = [""] # Don't raise, just use empty content def _save_file(self): """Save file with enhanced error handling and backup.""" with self.lock: if not self.filename: return False try: # Create backup if file exists if os.path.exists(self.filename): backup_name = f"{self.filename}.bak" try: with open(self.filename, encoding="utf-8") as f: backup_content = f.read() with open(backup_name, "w", encoding="utf-8") as f: f.write(backup_content) except: pass # Backup failed, but continue with save # Save the file with open(self.filename, "w", encoding="utf-8") as f: f.write("\n".join(self.lines)) return True except Exception: return False def save_file(self): """Thread-safe save file command.""" if not self.running: return self._save_file() try: self.client_sock.send(pickle.dumps({"command": "save_file"})) except: return self._save_file() # Fallback to direct save def start(self): """Start the editor with enhanced error handling.""" if self.running: return False try: self.running = True self.socket_thread = threading.Thread(target=self.socket_listener, daemon=True) self.socket_thread.start() self.thread = threading.Thread(target=self.run, daemon=True) self.thread.start() return True except Exception as e: self.running = False self._cleanup() raise RuntimeError(f"Failed to start editor: {e}") def stop(self): """Stop the editor with proper cleanup.""" try: if self.client_sock: self.client_sock.send(pickle.dumps({"command": "stop"})) except: pass self.running = False time.sleep(0.1) # Give threads time to finish self._cleanup() def run(self): """Run the main editor loop with exception handling.""" try: curses.wrapper(self.main_loop) except Exception: self._exception_occurred = True self._cleanup() def main_loop(self, stdscr): """Main editor loop with enhanced error recovery.""" self.stdscr = stdscr try: # Configure curses curses.curs_set(1) self.stdscr.keypad(True) self.stdscr.timeout(100) # Non-blocking with timeout while self.running: try: # Process queued commands while True: try: command = self.command_queue.get_nowait() with self.lock: self.execute_command(command) except queue.Empty: break # Draw screen with self.lock: self.draw() # Handle input try: key = self.stdscr.getch() if key != -1: # -1 means timeout/no input with self.lock: self.handle_key(key) except curses.error: pass # Ignore curses errors except Exception: # Log error but continue running pass except Exception: self._exception_occurred = True finally: self._cleanup() def draw(self): """Draw the editor screen with error handling.""" try: self.stdscr.clear() height, width = self.stdscr.getmaxyx() # Draw lines for i, line in enumerate(self.lines): if i >= height - 1: break try: # Handle long lines and special characters display_line = line[: width - 1] if len(line) >= width else line self.stdscr.addstr(i, 0, display_line) except curses.error: pass # Skip lines that can't be displayed # Draw status line status = f"{self.mode.upper()} | {self.filename or 'untitled'} | {self.cursor_y+1}:{self.cursor_x+1}" if self.mode == "command": status = self.command[: width - 1] try: self.stdscr.addstr(height - 1, 0, status[: width - 1]) except curses.error: pass # Position cursor cursor_x = min(self.cursor_x, width - 1) cursor_y = min(self.cursor_y, height - 2) try: self.stdscr.move(cursor_y, cursor_x) except curses.error: pass self.stdscr.refresh() except Exception: pass # Continue even if draw fails def handle_key(self, key): """Handle keyboard input with error recovery.""" try: if self.mode == "normal": self.handle_normal(key) elif self.mode == "insert": self.handle_insert(key) elif self.mode == "command": self.handle_command(key) except Exception: pass # Continue on error def handle_normal(self, key): """Handle normal mode keys.""" try: if key == ord("h") or key == curses.KEY_LEFT: self.move_cursor(0, -1) elif key == ord("j") or key == curses.KEY_DOWN: self.move_cursor(1, 0) elif key == ord("k") or key == curses.KEY_UP: self.move_cursor(-1, 0) elif key == ord("l") or key == curses.KEY_RIGHT: self.move_cursor(0, 1) elif key == ord("i"): self.mode = "insert" elif key == ord(":"): self.mode = "command" self.command = ":" elif key == ord("x"): self._delete_char() elif key == ord("a"): self.cursor_x = min(self.cursor_x + 1, len(self.lines[self.cursor_y])) self.mode = "insert" elif key == ord("A"): self.cursor_x = len(self.lines[self.cursor_y]) self.mode = "insert" elif key == ord("o"): self._insert_line(self.cursor_y + 1, "") self.cursor_y += 1 self.cursor_x = 0 self.mode = "insert" elif key == ord("O"): self._insert_line(self.cursor_y, "") self.cursor_x = 0 self.mode = "insert" elif key == ord("d") and self.prev_key == ord("d"): if self.cursor_y < len(self.lines): self.clipboard = self.lines[self.cursor_y] self._delete_line(self.cursor_y) if self.cursor_y >= len(self.lines): self.cursor_y = max(0, len(self.lines) - 1) self.cursor_x = 0 elif key == ord("y") and self.prev_key == ord("y"): if self.cursor_y < len(self.lines): self.clipboard = self.lines[self.cursor_y] elif key == ord("p"): self._insert_line(self.cursor_y + 1, self.clipboard) self.cursor_y += 1 self.cursor_x = 0 elif key == ord("P"): self._insert_line(self.cursor_y, self.clipboard) self.cursor_x = 0 elif key == ord("w"): self._move_word_forward() elif key == ord("b"): self._move_word_backward() elif key == ord("0"): self.cursor_x = 0 elif key == ord("$"): self.cursor_x = len(self.lines[self.cursor_y]) elif key == ord("g"): if self.prev_key == ord("g"): self.cursor_y = 0 self.cursor_x = 0 elif key == ord("G"): self.cursor_y = max(0, len(self.lines) - 1) self.cursor_x = 0 elif key == ord("u"): self.undo() elif key == ord("r") and self.prev_key == 18: # Ctrl-R self.redo() self.prev_key = key except Exception: pass def _move_word_forward(self): """Move cursor forward by word.""" if self.cursor_y >= len(self.lines): return line = self.lines[self.cursor_y] i = self.cursor_x # Skip non-alphanumeric while i < len(line) and not line[i].isalnum(): i += 1 # Skip alphanumeric while i < len(line) and line[i].isalnum(): i += 1 self.cursor_x = i def _move_word_backward(self): """Move cursor backward by word.""" if self.cursor_y >= len(self.lines): return line = self.lines[self.cursor_y] i = max(0, self.cursor_x - 1) # Skip non-alphanumeric while i >= 0 and not line[i].isalnum(): i -= 1 # Skip alphanumeric while i >= 0 and line[i].isalnum(): i -= 1 self.cursor_x = max(0, i + 1) def handle_insert(self, key): """Handle insert mode keys.""" try: if key == 27: # ESC self.mode = "normal" if self.cursor_x > 0: self.cursor_x -= 1 elif key == 10 or key == 13: # Enter self._split_line() elif key == curses.KEY_BACKSPACE or key == 127 or key == 8: self._backspace() elif 32 <= key <= 126: char = chr(key) self._insert_char(char) except Exception: pass def handle_command(self, key): """Handle command mode keys.""" try: if key == 10 or key == 13: # Enter cmd = self.command[1:].strip() if cmd in ["q", "q!"]: self.running = False elif cmd == "w": self._save_file() elif cmd in ["wq", "wq!", "x", "xq", "x!"]: self._save_file() self.running = False elif cmd.startswith("w "): self.filename = cmd[2:].strip() self._save_file() self.mode = "normal" self.command = "" elif key == 27: # ESC self.mode = "normal" self.command = "" elif key == curses.KEY_BACKSPACE or key == 127 or key == 8: if len(self.command) > 1: self.command = self.command[:-1] elif 32 <= key <= 126: self.command += chr(key) except Exception: self.mode = "normal" self.command = "" def move_cursor(self, dy, dx): """Move cursor with bounds checking.""" if not self.lines: self.lines = [""] new_y = self.cursor_y + dy new_x = self.cursor_x + dx # Ensure valid Y position if 0 <= new_y < len(self.lines): self.cursor_y = new_y # Ensure valid X position for new line max_x = len(self.lines[self.cursor_y]) self.cursor_x = max(0, min(new_x, max_x)) elif new_y < 0: self.cursor_y = 0 self.cursor_x = 0 elif new_y >= len(self.lines): self.cursor_y = max(0, len(self.lines) - 1) self.cursor_x = len(self.lines[self.cursor_y]) def save_state(self): """Save current state for undo.""" with self.lock: state = { "lines": list(self.lines), "cursor_y": self.cursor_y, "cursor_x": self.cursor_x, } self.undo_stack.append(state) if len(self.undo_stack) > self.max_undo: self.undo_stack.pop(0) self.redo_stack.clear() def undo(self): """Undo last change.""" with self.lock: if self.undo_stack: current_state = { "lines": list(self.lines), "cursor_y": self.cursor_y, "cursor_x": self.cursor_x, } self.redo_stack.append(current_state) state = self.undo_stack.pop() self.lines = state["lines"] self.cursor_y = min(state["cursor_y"], len(self.lines) - 1) self.cursor_x = min( state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0, ) def redo(self): """Redo last undone change.""" with self.lock: if self.redo_stack: current_state = { "lines": list(self.lines), "cursor_y": self.cursor_y, "cursor_x": self.cursor_x, } self.undo_stack.append(current_state) state = self.redo_stack.pop() self.lines = state["lines"] self.cursor_y = min(state["cursor_y"], len(self.lines) - 1) self.cursor_x = min( state["cursor_x"], len(self.lines[self.cursor_y]) if self.lines else 0, ) def _insert_text(self, text): """Insert text at cursor position.""" if not text: return self.save_state() lines = text.split("\n") if len(lines) == 1: # Single line insert if self.cursor_y >= len(self.lines): self.lines.append("") self.cursor_y = len(self.lines) - 1 line = self.lines[self.cursor_y] self.lines[self.cursor_y] = line[: self.cursor_x] + text + line[self.cursor_x :] self.cursor_x += len(text) else: # Multi-line insert if self.cursor_y >= len(self.lines): self.lines.append("") self.cursor_y = len(self.lines) - 1 first = self.lines[self.cursor_y][: self.cursor_x] + lines[0] last = lines[-1] + self.lines[self.cursor_y][self.cursor_x :] self.lines[self.cursor_y] = first for i in range(1, len(lines) - 1): self.lines.insert(self.cursor_y + i, lines[i]) self.lines.insert(self.cursor_y + len(lines) - 1, last) self.cursor_y += len(lines) - 1 self.cursor_x = len(lines[-1]) def insert_text(self, text): """Thread-safe text insertion.""" try: self.client_sock.send(pickle.dumps({"command": "insert_text", "text": text})) except: with self.lock: self._insert_text(text) def _delete_char(self): """Delete character at cursor.""" self.save_state() if self.cursor_y < len(self.lines) and self.cursor_x < len(self.lines[self.cursor_y]): line = self.lines[self.cursor_y] self.lines[self.cursor_y] = line[: self.cursor_x] + line[self.cursor_x + 1 :] def delete_char(self): """Thread-safe character deletion.""" try: self.client_sock.send(pickle.dumps({"command": "delete_char"})) except: with self.lock: self._delete_char() def _insert_char(self, char): """Insert single character.""" if self.cursor_y >= len(self.lines): self.lines.append("") self.cursor_y = len(self.lines) - 1 line = self.lines[self.cursor_y] self.lines[self.cursor_y] = line[: self.cursor_x] + char + line[self.cursor_x :] self.cursor_x += 1 def _split_line(self): """Split line at cursor.""" if self.cursor_y >= len(self.lines): self.lines.append("") self.cursor_y = len(self.lines) - 1 line = self.lines[self.cursor_y] self.lines[self.cursor_y] = line[: self.cursor_x] self.lines.insert(self.cursor_y + 1, line[self.cursor_x :]) self.cursor_y += 1 self.cursor_x = 0 def _backspace(self): """Handle backspace key.""" if self.cursor_x > 0: line = self.lines[self.cursor_y] self.lines[self.cursor_y] = line[: self.cursor_x - 1] + line[self.cursor_x :] self.cursor_x -= 1 elif self.cursor_y > 0: prev_len = len(self.lines[self.cursor_y - 1]) self.lines[self.cursor_y - 1] += self.lines[self.cursor_y] del self.lines[self.cursor_y] self.cursor_y -= 1 self.cursor_x = prev_len def _insert_line(self, line_num, text): """Insert a new line.""" self.save_state() line_num = max(0, min(line_num, len(self.lines))) self.lines.insert(line_num, text) def _delete_line(self, line_num): """Delete a line.""" self.save_state() if 0 <= line_num < len(self.lines): if len(self.lines) > 1: del self.lines[line_num] else: self.lines = [""] def _set_text(self, text): """Set entire text content.""" self.save_state() self.lines = text.splitlines() if text else [""] self.cursor_y = 0 self.cursor_x = 0 def set_text(self, text): """Thread-safe text setting.""" if not self.running: with self.lock: self._set_text(text) return try: self.client_sock.send(pickle.dumps({"command": "set_text", "text": text})) except: with self.lock: self._set_text(text) def _goto_line(self, line_num): """Go to specific line.""" line_num = max(0, min(line_num - 1, len(self.lines) - 1)) self.cursor_y = line_num self.cursor_x = 0 def goto_line(self, line_num): """Thread-safe goto line.""" try: self.client_sock.send(pickle.dumps({"command": "goto_line", "line_num": line_num})) except: with self.lock: self._goto_line(line_num) def get_text(self): """Get entire text content.""" try: self.client_sock.send(pickle.dumps({"command": "get_text"})) data = self.client_sock.recv(65536) return pickle.loads(data) except: with self.lock: return "\n".join(self.lines) def get_cursor(self): """Get cursor position.""" try: self.client_sock.send(pickle.dumps({"command": "get_cursor"})) data = self.client_sock.recv(4096) return pickle.loads(data) except: with self.lock: return (self.cursor_y, self.cursor_x) def get_file_info(self): """Get file information.""" try: self.client_sock.send(pickle.dumps({"command": "get_file_info"})) data = self.client_sock.recv(4096) return pickle.loads(data) except: with self.lock: return { "filename": self.filename, "lines": len(self.lines), "cursor": (self.cursor_y, self.cursor_x), "mode": self.mode, } def socket_listener(self): """Listen for socket commands with error handling.""" while self.running: try: data = self.server_sock.recv(65536) if not data: break command = pickle.loads(data) self.command_queue.put(command) except socket.timeout: continue except OSError: if self.running: continue else: break except Exception: continue def execute_command(self, command): """Execute command with error handling.""" try: cmd = command.get("command") if cmd == "insert_text": self._insert_text(command.get("text", "")) elif cmd == "delete_char": self._delete_char() elif cmd == "save_file": self._save_file() elif cmd == "set_text": self._set_text(command.get("text", "")) elif cmd == "goto_line": self._goto_line(command.get("line_num", 1)) elif cmd == "get_text": result = "\n".join(self.lines) self.server_sock.send(pickle.dumps(result)) elif cmd == "get_cursor": result = (self.cursor_y, self.cursor_x) self.server_sock.send(pickle.dumps(result)) elif cmd == "get_file_info": result = { "filename": self.filename, "lines": len(self.lines), "cursor": (self.cursor_y, self.cursor_x), "mode": self.mode, } self.server_sock.send(pickle.dumps(result)) elif cmd == "stop": self.running = False except Exception: pass # Additional public methods for backwards compatibility def move_cursor_to(self, y, x): """Move cursor to specific position.""" with self.lock: self.cursor_y = max(0, min(y, len(self.lines) - 1)) self.cursor_x = max(0, min(x, len(self.lines[self.cursor_y]))) def get_line(self, line_num): """Get specific line.""" with self.lock: if 0 <= line_num < len(self.lines): return self.lines[line_num] return None def get_lines(self, start, end): """Get range of lines.""" with self.lock: start = max(0, start) end = min(end, len(self.lines)) return self.lines[start:end] def insert_at_line(self, line_num, text): """Insert text at specific line.""" with self.lock: self.save_state() line_num = max(0, min(line_num, len(self.lines))) self.lines.insert(line_num, text) def delete_lines(self, start, end): """Delete range of lines.""" with self.lock: self.save_state() start = max(0, start) end = min(end, len(self.lines)) if start < end: del self.lines[start:end] if not self.lines: self.lines = [""] def replace_text(self, start_line, start_col, end_line, end_col, new_text): """Replace text in range.""" with self.lock: self.save_state() # Validate bounds start_line = max(0, min(start_line, len(self.lines) - 1)) end_line = max(0, min(end_line, len(self.lines) - 1)) if start_line == end_line: line = self.lines[start_line] start_col = max(0, min(start_col, len(line))) end_col = max(0, min(end_col, len(line))) self.lines[start_line] = line[:start_col] + new_text + line[end_col:] else: first_part = self.lines[start_line][:start_col] last_part = self.lines[end_line][end_col:] new_lines = new_text.split("\n") self.lines[start_line] = first_part + new_lines[0] del self.lines[start_line + 1 : end_line + 1] for i, new_line in enumerate(new_lines[1:], 1): self.lines.insert(start_line + i, new_line) if len(new_lines) > 1: self.lines[start_line + len(new_lines) - 1] += last_part else: self.lines[start_line] += last_part def search(self, pattern, start_line=0): """Search for pattern in text.""" with self.lock: results = [] try: for i in range(start_line, len(self.lines)): matches = re.finditer(pattern, self.lines[i]) for match in matches: results.append((i, match.start(), match.end())) except re.error: pass return results def replace_all(self, search_text, replace_text): """Replace all occurrences of text.""" with self.lock: self.save_state() for i in range(len(self.lines)): self.lines[i] = self.lines[i].replace(search_text, replace_text) def select_range(self, start_line, start_col, end_line, end_col): """Select text range.""" with self.lock: self.selection_start = (start_line, start_col) self.selection_end = (end_line, end_col) def get_selection(self): """Get selected text.""" with self.lock: if not self.selection_start or not self.selection_end: return "" sl, sc = self.selection_start el, ec = self.selection_end # Validate bounds if sl < 0 or sl >= len(self.lines) or el < 0 or el >= len(self.lines): return "" if sl == el: return self.lines[sl][sc:ec] result = [self.lines[sl][sc:]] for i in range(sl + 1, el): if i < len(self.lines): result.append(self.lines[i]) if el < len(self.lines): result.append(self.lines[el][:ec]) return "\n".join(result) def delete_selection(self): """Delete selected text.""" with self.lock: if not self.selection_start or not self.selection_end: return self.save_state() sl, sc = self.selection_start el, ec = self.selection_end if 0 <= sl < len(self.lines) and 0 <= el < len(self.lines): self.replace_text(sl, sc, el, ec, "") self.selection_start = None self.selection_end = None def apply_search_replace_block(self, search_block, replace_block): """Apply search and replace on block.""" with self.lock: self.save_state() search_lines = search_block.splitlines() replace_lines = replace_block.splitlines() for i in range(len(self.lines) - len(search_lines) + 1): match = True for j, search_line in enumerate(search_lines): if i + j >= len(self.lines): match = False break if self.lines[i + j].strip() != search_line.strip(): match = False break if match: # Preserve indentation indent = len(self.lines[i]) - len(self.lines[i].lstrip()) indented_replace = [" " * indent + line for line in replace_lines] self.lines[i : i + len(search_lines)] = indented_replace return True return False def apply_diff(self, diff_text): """Apply unified diff.""" with self.lock: self.save_state() try: lines = diff_text.split("\n") start_line = 0 for line in lines: if line.startswith("@@"): match = re.search(r"@@ -(\d+),?(\d*) \+(\d+),?(\d*) @@", line) if match: start_line = int(match.group(1)) - 1 elif line.startswith("-"): if start_line < len(self.lines): del self.lines[start_line] elif line.startswith("+"): self.lines.insert(start_line, line[1:]) start_line += 1 elif line and not line.startswith("\\"): start_line += 1 except Exception: pass def get_context(self, line_num, context_lines=3): """Get lines around specific line.""" with self.lock: start = max(0, line_num - context_lines) end = min(len(self.lines), line_num + context_lines + 1) return self.get_lines(start, end) def count_lines(self): """Count total lines.""" with self.lock: return len(self.lines) def close(self): """Close the editor.""" self.stop() def is_running(self): """Check if editor is running.""" return self.running def wait(self, timeout=None): """Wait for editor to finish.""" if self.thread and self.thread.is_alive(): self.thread.join(timeout=timeout) return not self.thread.is_alive() return True def __enter__(self): """Context manager entry.""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" if exc_type: self._exception_occurred = True self.stop() return False def __del__(self): """Destructor for cleanup.""" self._cleanup() def main(): """Main entry point with error handling.""" editor = None try: filename = sys.argv[1] if len(sys.argv) > 1 else None # Parse additional arguments auto_save = "--auto-save" in sys.argv # Create and start editor editor = RPEditor(filename, auto_save=auto_save) editor.start() # Wait for editor to finish if editor.thread: editor.thread.join() except KeyboardInterrupt: pass except Exception as e: print(f"Error: {e}", file=sys.stderr) finally: if editor: editor.stop() # Ensure screen is cleared os.system("clear" if os.name != "nt" else "cls") if __name__ == "__main__": if "rpe" in sys.argv[0]: main()