import queue import subprocess import sys import threading import time from pr.tools.process_handlers import detect_process_type, get_handler_for_process from pr.tools.prompt_detection import get_global_detector from pr.ui import Colors class TerminalMultiplexer: def __init__(self, name, show_output=True): self.name = name self.show_output = show_output self.stdout_buffer = [] self.stderr_buffer = [] self.stdout_queue = queue.Queue() self.stderr_queue = queue.Queue() self.active = True self.lock = threading.Lock() self.metadata = { "start_time": time.time(), "last_activity": time.time(), "interaction_count": 0, "process_type": "unknown", "state": "active", } self.handler = None self.prompt_detector = get_global_detector() if self.show_output: self.display_thread = threading.Thread(target=self._display_worker, daemon=True) self.display_thread.start() def _display_worker(self): while self.active: try: line = self.stdout_queue.get(timeout=0.1) if line: if self.metadata.get("process_type") in ["vim", "ssh"]: sys.stdout.write(line) else: sys.stdout.write(f"{Colors.GRAY}[{self.name}]{Colors.RESET} {line}\n") sys.stdout.flush() except queue.Empty: pass try: line = self.stderr_queue.get(timeout=0.1) if line: if self.metadata.get("process_type") in ["vim", "ssh"]: sys.stderr.write(line) else: sys.stderr.write(f"{Colors.YELLOW}[{self.name} err]{Colors.RESET} {line}\n") sys.stderr.flush() except queue.Empty: pass def write_stdout(self, data): with self.lock: self.stdout_buffer.append(data) self.metadata["last_activity"] = time.time() # Update handler state if available if self.handler: self.handler.update_state(data) # Update prompt detector self.prompt_detector.update_session_state( self.name, data, self.metadata["process_type"] ) if self.show_output: self.stdout_queue.put(data) def write_stderr(self, data): with self.lock: self.stderr_buffer.append(data) self.metadata["last_activity"] = time.time() # Update handler state if available if self.handler: self.handler.update_state(data) # Update prompt detector self.prompt_detector.update_session_state( self.name, data, self.metadata["process_type"] ) if self.show_output: self.stderr_queue.put(data) def get_stdout(self): with self.lock: return "".join(self.stdout_buffer) def get_stderr(self): with self.lock: return "".join(self.stderr_buffer) def get_all_output(self): with self.lock: return { "stdout": "".join(self.stdout_buffer), "stderr": "".join(self.stderr_buffer), } def get_metadata(self): with self.lock: return self.metadata.copy() def update_metadata(self, key, value): with self.lock: self.metadata[key] = value def set_process_type(self, process_type): """Set the process type and initialize appropriate handler.""" with self.lock: self.metadata["process_type"] = process_type self.handler = get_handler_for_process(process_type, self) def send_input(self, input_data): if hasattr(self, "process") and self.process.poll() is None: try: self.process.stdin.write(input_data + "\n") self.process.stdin.flush() with self.lock: self.metadata["last_activity"] = time.time() self.metadata["interaction_count"] += 1 except Exception as e: self.write_stderr(f"Error sending input: {e}") else: # This will be implemented when we have a process attached # For now, just update activity with self.lock: self.metadata["last_activity"] = time.time() self.metadata["interaction_count"] += 1 def close(self): self.active = False if hasattr(self, "display_thread"): self.display_thread.join(timeout=1) _multiplexers = {} _mux_counter = 0 _mux_lock = threading.Lock() _background_monitor = None _monitor_active = False _monitor_interval = 0.2 # 200ms def create_multiplexer(name=None, show_output=True): global _mux_counter with _mux_lock: if name is None: _mux_counter += 1 name = f"process-{_mux_counter}" mux = TerminalMultiplexer(name, show_output) _multiplexers[name] = mux return name, mux def get_multiplexer(name): return _multiplexers.get(name) def close_multiplexer(name): mux = _multiplexers.get(name) if mux: mux.close() del _multiplexers[name] def get_all_multiplexer_states(): with _mux_lock: states = {} for name, mux in _multiplexers.items(): states[name] = { "metadata": mux.get_metadata(), "output_summary": { "stdout_lines": len(mux.stdout_buffer), "stderr_lines": len(mux.stderr_buffer), }, } return states def cleanup_all_multiplexers(): for mux in list(_multiplexers.values()): mux.close() _multiplexers.clear() # Background process management _background_processes = {} _process_lock = threading.Lock() class BackgroundProcess: def __init__(self, name, command): self.name = name self.command = command self.process = None self.multiplexer = None self.status = "starting" self.start_time = time.time() self.end_time = None def start(self): """Start the background process.""" try: # Create multiplexer for this process mux_name, mux = create_multiplexer(self.name, show_output=False) self.multiplexer = mux # Detect process type process_type = detect_process_type(self.command) mux.set_process_type(process_type) # Start the subprocess self.process = subprocess.Popen( self.command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True, ) self.status = "running" # Start output monitoring threads threading.Thread(target=self._monitor_stdout, daemon=True).start() threading.Thread(target=self._monitor_stderr, daemon=True).start() return {"status": "success", "pid": self.process.pid} except Exception as e: self.status = "error" return {"status": "error", "error": str(e)} def _monitor_stdout(self): """Monitor stdout from the process.""" try: for line in iter(self.process.stdout.readline, ""): if line: self.multiplexer.write_stdout(line.rstrip("\n\r")) except Exception as e: self.write_stderr(f"Error reading stdout: {e}") finally: self._check_completion() def _monitor_stderr(self): """Monitor stderr from the process.""" try: for line in iter(self.process.stderr.readline, ""): if line: self.multiplexer.write_stderr(line.rstrip("\n\r")) except Exception as e: self.write_stderr(f"Error reading stderr: {e}") def _check_completion(self): """Check if process has completed.""" if self.process and self.process.poll() is not None: self.status = "completed" self.end_time = time.time() def get_info(self): """Get process information.""" self._check_completion() return { "name": self.name, "command": self.command, "status": self.status, "pid": self.process.pid if self.process else None, "start_time": self.start_time, "end_time": self.end_time, "runtime": ( time.time() - self.start_time if not self.end_time else self.end_time - self.start_time ), } def get_output(self, lines=None): """Get process output.""" if not self.multiplexer: return [] all_output = self.multiplexer.get_all_output() stdout_lines = all_output["stdout"].split("\n") if all_output["stdout"] else [] stderr_lines = all_output["stderr"].split("\n") if all_output["stderr"] else [] combined = stdout_lines + stderr_lines if lines: combined = combined[-lines:] return [line for line in combined if line.strip()] def send_input(self, input_text): """Send input to the process.""" if self.process and self.status == "running": try: self.process.stdin.write(input_text + "\n") self.process.stdin.flush() return {"status": "success"} except Exception as e: return {"status": "error", "error": str(e)} return {"status": "error", "error": "Process not running or no stdin"} def kill(self): """Kill the process.""" if self.process and self.status == "running": try: self.process.terminate() # Wait a bit for graceful termination time.sleep(0.1) if self.process.poll() is None: self.process.kill() self.status = "killed" self.end_time = time.time() return {"status": "success"} except Exception as e: return {"status": "error", "error": str(e)} return {"status": "error", "error": "Process not running"} def start_background_process(name, command): """Start a background process.""" with _process_lock: if name in _background_processes: return {"status": "error", "error": f"Process {name} already exists"} process = BackgroundProcess(name, command) result = process.start() if result["status"] == "success": _background_processes[name] = process return result def get_all_sessions(): """Get all background process sessions.""" with _process_lock: sessions = {} for name, process in _background_processes.items(): sessions[name] = process.get_info() return sessions def get_session_info(name): """Get information about a specific session.""" with _process_lock: process = _background_processes.get(name) return process.get_info() if process else None def get_session_output(name, lines=None): """Get output from a specific session.""" with _process_lock: process = _background_processes.get(name) return process.get_output(lines) if process else None def send_input_to_session(name, input_text): """Send input to a background session.""" with _process_lock: process = _background_processes.get(name) return ( process.send_input(input_text) if process else {"status": "error", "error": "Session not found"} ) def kill_session(name): """Kill a background session.""" with _process_lock: process = _background_processes.get(name) if process: result = process.kill() if result["status"] == "success": del _background_processes[name] return result return {"status": "error", "error": "Session not found"}