import threading import queue import time import sys import subprocess import signal import os from pr.ui import Colors from collections import defaultdict from pr.tools.process_handlers import get_handler_for_process, detect_process_type from pr.tools.prompt_detection import get_global_detector class TerminalMultiplexer: def __init__(self, name, show_output=True): self.name = name self.show_output = show_output self.stdout_buffer = [] self.stderr_buffer = [] self.stdout_queue = queue.Queue() self.stderr_queue = queue.Queue() self.active = True self.lock = threading.Lock() self.metadata = { 'start_time': time.time(), 'last_activity': time.time(), 'interaction_count': 0, 'process_type': 'unknown', 'state': 'active' } self.handler = None self.prompt_detector = get_global_detector() if self.show_output: self.display_thread = threading.Thread(target=self._display_worker, daemon=True) self.display_thread.start() def _display_worker(self): while self.active: try: line = self.stdout_queue.get(timeout=0.1) if line: sys.stdout.write(f"{Colors.GRAY}[{self.name}]{Colors.RESET} {line}") sys.stdout.flush() except queue.Empty: pass try: line = self.stderr_queue.get(timeout=0.1) if line: sys.stderr.write(f"{Colors.YELLOW}[{self.name} err]{Colors.RESET} {line}") sys.stderr.flush() except queue.Empty: pass def write_stdout(self, data): with self.lock: self.stdout_buffer.append(data) self.metadata['last_activity'] = time.time() # Update handler state if available if self.handler: self.handler.update_state(data) # Update prompt detector self.prompt_detector.update_session_state(self.name, data, self.metadata['process_type']) if self.show_output: self.stdout_queue.put(data) def write_stderr(self, data): with self.lock: self.stderr_buffer.append(data) self.metadata['last_activity'] = time.time() # Update handler state if available if self.handler: self.handler.update_state(data) # Update prompt detector self.prompt_detector.update_session_state(self.name, data, self.metadata['process_type']) if self.show_output: self.stderr_queue.put(data) def get_stdout(self): with self.lock: return ''.join(self.stdout_buffer) def get_stderr(self): with self.lock: return ''.join(self.stderr_buffer) def get_all_output(self): with self.lock: return { 'stdout': ''.join(self.stdout_buffer), 'stderr': ''.join(self.stderr_buffer) } def get_metadata(self): with self.lock: return self.metadata.copy() def update_metadata(self, key, value): with self.lock: self.metadata[key] = value def set_process_type(self, process_type): """Set the process type and initialize appropriate handler.""" with self.lock: self.metadata['process_type'] = process_type self.handler = get_handler_for_process(process_type, self) def send_input(self, input_data): if hasattr(self, 'process') and self.process.poll() is None: try: self.process.stdin.write(input_data + '\n') self.process.stdin.flush() with self.lock: self.metadata['last_activity'] = time.time() self.metadata['interaction_count'] += 1 except Exception as e: self.write_stderr(f"Error sending input: {e}") else: # This will be implemented when we have a process attached # For now, just update activity with self.lock: self.metadata['last_activity'] = time.time() self.metadata['interaction_count'] += 1 def close(self): self.active = False if hasattr(self, 'display_thread'): self.display_thread.join(timeout=1) _multiplexers = {} _mux_counter = 0 _mux_lock = threading.Lock() _background_monitor = None _monitor_active = False _monitor_interval = 0.2 # 200ms def create_multiplexer(name=None, show_output=True): global _mux_counter with _mux_lock: if name is None: _mux_counter += 1 name = f"process-{_mux_counter}" mux = TerminalMultiplexer(name, show_output) _multiplexers[name] = mux return name, mux def get_multiplexer(name): return _multiplexers.get(name) def close_multiplexer(name): mux = _multiplexers.get(name) if mux: mux.close() del _multiplexers[name] def get_all_multiplexer_states(): with _mux_lock: states = {} for name, mux in _multiplexers.items(): states[name] = { 'metadata': mux.get_metadata(), 'output_summary': { 'stdout_lines': len(mux.stdout_buffer), 'stderr_lines': len(mux.stderr_buffer) } } return states def cleanup_all_multiplexers(): for mux in list(_multiplexers.values()): mux.close() _multiplexers.clear() # Background process management _background_processes = {} _process_lock = threading.Lock() class BackgroundProcess: def __init__(self, name, command): self.name = name self.command = command self.process = None self.multiplexer = None self.status = 'starting' self.start_time = time.time() self.end_time = None def start(self): """Start the background process.""" try: # Create multiplexer for this process mux_name, mux = create_multiplexer(self.name, show_output=False) self.multiplexer = mux # Detect process type process_type = detect_process_type(self.command) mux.set_process_type(process_type) # Start the subprocess self.process = subprocess.Popen( self.command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) self.status = 'running' # Start output monitoring threads threading.Thread(target=self._monitor_stdout, daemon=True).start() threading.Thread(target=self._monitor_stderr, daemon=True).start() return {'status': 'success', 'pid': self.process.pid} except Exception as e: self.status = 'error' return {'status': 'error', 'error': str(e)} def _monitor_stdout(self): """Monitor stdout from the process.""" try: for line in iter(self.process.stdout.readline, ''): if line: self.multiplexer.write_stdout(line.rstrip('\n\r')) except Exception as e: self.write_stderr(f"Error reading stdout: {e}") finally: self._check_completion() def _monitor_stderr(self): """Monitor stderr from the process.""" try: for line in iter(self.process.stderr.readline, ''): if line: self.multiplexer.write_stderr(line.rstrip('\n\r')) except Exception as e: self.write_stderr(f"Error reading stderr: {e}") def _check_completion(self): """Check if process has completed.""" if self.process and self.process.poll() is not None: self.status = 'completed' self.end_time = time.time() def get_info(self): """Get process information.""" self._check_completion() return { 'name': self.name, 'command': self.command, 'status': self.status, 'pid': self.process.pid if self.process else None, 'start_time': self.start_time, 'end_time': self.end_time, 'runtime': time.time() - self.start_time if not self.end_time else self.end_time - self.start_time } def get_output(self, lines=None): """Get process output.""" if not self.multiplexer: return [] all_output = self.multiplexer.get_all_output() stdout_lines = all_output['stdout'].split('\n') if all_output['stdout'] else [] stderr_lines = all_output['stderr'].split('\n') if all_output['stderr'] else [] combined = stdout_lines + stderr_lines if lines: combined = combined[-lines:] return [line for line in combined if line.strip()] def send_input(self, input_text): """Send input to the process.""" if self.process and self.status == 'running': try: self.process.stdin.write(input_text + '\n') self.process.stdin.flush() return {'status': 'success'} except Exception as e: return {'status': 'error', 'error': str(e)} return {'status': 'error', 'error': 'Process not running or no stdin'} def kill(self): """Kill the process.""" if self.process and self.status == 'running': try: self.process.terminate() # Wait a bit for graceful termination time.sleep(0.1) if self.process.poll() is None: self.process.kill() self.status = 'killed' self.end_time = time.time() return {'status': 'success'} except Exception as e: return {'status': 'error', 'error': str(e)} return {'status': 'error', 'error': 'Process not running'} def start_background_process(name, command): """Start a background process.""" with _process_lock: if name in _background_processes: return {'status': 'error', 'error': f'Process {name} already exists'} process = BackgroundProcess(name, command) result = process.start() if result['status'] == 'success': _background_processes[name] = process return result def get_all_sessions(): """Get all background process sessions.""" with _process_lock: sessions = {} for name, process in _background_processes.items(): sessions[name] = process.get_info() return sessions def get_session_info(name): """Get information about a specific session.""" with _process_lock: process = _background_processes.get(name) return process.get_info() if process else None def get_session_output(name, lines=None): """Get output from a specific session.""" with _process_lock: process = _background_processes.get(name) return process.get_output(lines) if process else None def send_input_to_session(name, input_text): """Send input to a background session.""" with _process_lock: process = _background_processes.get(name) return process.send_input(input_text) if process else {'status': 'error', 'error': 'Session not found'} def kill_session(name): """Kill a background session.""" with _process_lock: process = _background_processes.get(name) if process: result = process.kill() if result['status'] == 'success': del _background_processes[name] return result return {'status': 'error', 'error': 'Session not found'}