From 2b701cb5cdaa1b1558ba6d736726c9706e65fb71 Mon Sep 17 00:00:00 2001 From: retoor Date: Tue, 4 Nov 2025 07:52:36 +0100 Subject: [PATCH] Perfect version. --- pr/agents/agent_manager.py | 9 + pr/commands/handlers.py | 142 ++++++++++++++ pr/commands/multiplexer_commands.py | 224 ++++++++++++++++++++++ pr/config.py | 33 +++- pr/core/assistant.py | 124 ++++++++++++- pr/core/autonomous_interactions.py | 189 +++++++++++++++++++ pr/core/background_monitor.py | 236 +++++++++++++++++++++++ pr/multiplexer.py | 255 +++++++++++++++++++++++++ pr/tools/base.py | 47 ++++- pr/tools/command.py | 4 +- pr/tools/interactive_control.py | 157 ++++++++++++++++ pr/tools/process_handlers.py | 264 ++++++++++++++++++++++++++ pr/tools/prompt_detection.py | 278 ++++++++++++++++++++++++++++ pr/ui/display.py | 1 + 14 files changed, 1956 insertions(+), 7 deletions(-) create mode 100644 pr/commands/multiplexer_commands.py create mode 100644 pr/core/autonomous_interactions.py create mode 100644 pr/core/background_monitor.py create mode 100644 pr/tools/interactive_control.py create mode 100644 pr/tools/process_handlers.py create mode 100644 pr/tools/prompt_detection.py diff --git a/pr/agents/agent_manager.py b/pr/agents/agent_manager.py index 1ea705a..f6e9027 100644 --- a/pr/agents/agent_manager.py +++ b/pr/agents/agent_manager.py @@ -5,6 +5,7 @@ from typing import Dict, List, Any, Optional, Callable from dataclasses import dataclass, field from .agent_roles import AgentRole, get_agent_role from .agent_communication import AgentMessage, AgentCommunicationBus, MessageType +from ..memory.knowledge_store import KnowledgeStore @dataclass class AgentInstance: @@ -36,6 +37,7 @@ class AgentManager: self.db_path = db_path self.api_caller = api_caller self.communication_bus = AgentCommunicationBus(db_path) + self.knowledge_store = KnowledgeStore(db_path) self.active_agents: Dict[str, AgentInstance] = {} self.session_id = str(uuid.uuid4())[:16] @@ -70,9 +72,16 @@ class AgentManager: agent.context.update(context) agent.add_message('user', task) + knowledge_matches = self.knowledge_store.search_entries(task, top_k=3) agent.task_count += 1 messages = agent.get_messages_for_api() + if knowledge_matches: + knowledge_content = "Knowledge base matches based on your query:\\n" + for i, entry in enumerate(knowledge_matches, 1): + shortened_content = entry.content[:2000] + knowledge_content += f"{i}. {shortened_content}\\n\\n" + messages.insert(-1, {'role': 'user', 'content': knowledge_content}) try: response = self.api_caller( diff --git a/pr/commands/handlers.py b/pr/commands/handlers.py index 7be0748..cc11816 100644 --- a/pr/commands/handlers.py +++ b/pr/commands/handlers.py @@ -1,4 +1,5 @@ import json +import time from pr.ui import Colors from pr.tools import read_file from pr.tools.base import get_tools_definition @@ -143,6 +144,9 @@ def handle_command(assistant, command): elif cmd == '/stats': show_system_stats(assistant) + elif cmd.startswith('/bg'): + handle_background_command(assistant, command) + else: return None @@ -389,3 +393,141 @@ def show_system_stats(assistant): print(f" API cache entries: {cache_stats['api_cache']['valid_entries']}") if 'tool_cache' in cache_stats: print(f" Tool cache entries: {cache_stats['tool_cache']['valid_entries']}") + +def handle_background_command(assistant, command): + """Handle background multiplexer commands.""" + parts = command.strip().split(maxsplit=2) + if len(parts) < 2: + print(f"{Colors.RED}Usage: /bg [args]{Colors.RESET}") + print(f"{Colors.GRAY}Available subcommands: start, list, status, output, input, kill, events{Colors.RESET}") + return + + subcmd = parts[1].lower() + + try: + if subcmd == 'start' and len(parts) >= 3: + session_name = f"bg_{len(parts[2].split())}_{int(time.time())}" + start_background_session(assistant, session_name, parts[2]) + elif subcmd == 'list': + list_background_sessions(assistant) + elif subcmd == 'status' and len(parts) >= 3: + show_session_status(assistant, parts[2]) + elif subcmd == 'output' and len(parts) >= 3: + show_session_output(assistant, parts[2]) + elif subcmd == 'input' and len(parts) >= 4: + send_session_input(assistant, parts[2], parts[3]) + elif subcmd == 'kill' and len(parts) >= 3: + kill_background_session(assistant, parts[2]) + elif subcmd == 'events': + show_background_events(assistant) + else: + print(f"{Colors.RED}Unknown background command: {subcmd}{Colors.RESET}") + print(f"{Colors.GRAY}Available: start, list, status, output, input, kill, events{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error executing background command: {e}{Colors.RESET}") + +def start_background_session(assistant, session_name, command): + """Start a command in background.""" + try: + from pr.multiplexer import start_background_process + result = start_background_process(session_name, command) + + if result['status'] == 'success': + print(f"{Colors.GREEN}Started background session '{session_name}' with PID {result['pid']}{Colors.RESET}") + else: + print(f"{Colors.RED}Failed to start background session: {result.get('error', 'Unknown error')}{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error starting background session: {e}{Colors.RESET}") + +def list_background_sessions(assistant): + """List all background sessions.""" + try: + from pr.ui.display import display_multiplexer_status + from pr.multiplexer import get_all_sessions + + sessions = get_all_sessions() + display_multiplexer_status(sessions) + except Exception as e: + print(f"{Colors.RED}Error listing background sessions: {e}{Colors.RESET}") + +def show_session_status(assistant, session_name): + """Show status of a specific session.""" + try: + from pr.multiplexer import get_session_info + + info = get_session_info(session_name) + if info: + print(f"{Colors.BOLD}Session '{session_name}':{Colors.RESET}") + print(f" Status: {info.get('status', 'unknown')}") + print(f" PID: {info.get('pid', 'N/A')}") + print(f" Command: {info.get('command', 'N/A')}") + if 'start_time' in info: + import time + elapsed = time.time() - info['start_time'] + print(f" Running for: {elapsed:.1f}s") + else: + print(f"{Colors.YELLOW}Session '{session_name}' not found{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error getting session status: {e}{Colors.RESET}") + +def show_session_output(assistant, session_name): + """Show output of a specific session.""" + try: + from pr.multiplexer import get_session_output + + output = get_session_output(session_name, lines=50) + if output: + print(f"{Colors.BOLD}Recent output from '{session_name}':{Colors.RESET}") + print(f"{Colors.GRAY}{'─' * 60}{Colors.RESET}") + for line in output: + print(line) + else: + print(f"{Colors.YELLOW}No output available for session '{session_name}'{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error getting session output: {e}{Colors.RESET}") + +def send_session_input(assistant, session_name, input_text): + """Send input to a background session.""" + try: + from pr.multiplexer import send_input_to_session + + result = send_input_to_session(session_name, input_text) + if result['status'] == 'success': + print(f"{Colors.GREEN}Input sent to session '{session_name}'{Colors.RESET}") + else: + print(f"{Colors.RED}Failed to send input: {result.get('error', 'Unknown error')}{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error sending input: {e}{Colors.RESET}") + +def kill_background_session(assistant, session_name): + """Kill a background session.""" + try: + from pr.multiplexer import kill_session + + result = kill_session(session_name) + if result['status'] == 'success': + print(f"{Colors.GREEN}Session '{session_name}' terminated{Colors.RESET}") + else: + print(f"{Colors.RED}Failed to kill session: {result.get('error', 'Unknown error')}{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error killing session: {e}{Colors.RESET}") + +def show_background_events(assistant): + """Show recent background events.""" + try: + from pr.core.background_monitor import get_global_monitor + + monitor = get_global_monitor() + events = monitor.get_pending_events() + + if events: + print(f"{Colors.BOLD}Recent Background Events:{Colors.RESET}") + print(f"{Colors.GRAY}{'─' * 60}{Colors.RESET}") + + for event in events[-10:]: # Show last 10 events + from pr.ui.display import display_background_event + display_background_event(event) + else: + print(f"{Colors.GRAY}No recent background events{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error getting background events: {e}{Colors.RESET}") diff --git a/pr/commands/multiplexer_commands.py b/pr/commands/multiplexer_commands.py new file mode 100644 index 0000000..10357c1 --- /dev/null +++ b/pr/commands/multiplexer_commands.py @@ -0,0 +1,224 @@ +from pr.tools.interactive_control import ( + list_active_sessions, get_session_status, read_session_output, + send_input_to_session, close_interactive_session +) +from pr.multiplexer import get_multiplexer +from pr.tools.prompt_detection import get_global_detector +from pr.ui import Colors + +def show_sessions(args=None): + """Show all active multiplexer sessions.""" + sessions = list_active_sessions() + + if not sessions: + print(f"{Colors.YELLOW}No active sessions.{Colors.RESET}") + return + + print(f"{Colors.BOLD}Active Sessions:{Colors.RESET}") + print("-" * 80) + + for session_name, session_data in sessions.items(): + metadata = session_data['metadata'] + output_summary = session_data['output_summary'] + + status = get_session_status(session_name) + is_active = status.get('is_active', False) if status else False + + status_color = Colors.GREEN if is_active else Colors.RED + print(f"{Colors.CYAN}{session_name}{Colors.RESET}: {status_color}{metadata.get('process_type', 'unknown')}{Colors.RESET}") + + if status and 'pid' in status: + print(f" PID: {status['pid']}") + + print(f" Age: {metadata.get('start_time', 0):.1f}s") + print(f" Output: {output_summary['stdout_lines']} stdout, {output_summary['stderr_lines']} stderr lines") + print(f" Interactions: {metadata.get('interaction_count', 0)}") + print(f" State: {metadata.get('state', 'unknown')}") + print() + +def attach_session(args): + """Attach to a session (show its output and allow interaction).""" + if not args or len(args) < 1: + print(f"{Colors.RED}Usage: attach_session {Colors.RESET}") + return + + session_name = args[0] + status = get_session_status(session_name) + + if not status: + print(f"{Colors.RED}Session '{session_name}' not found.{Colors.RESET}") + return + + print(f"{Colors.BOLD}Attaching to session: {session_name}{Colors.RESET}") + print(f"Process type: {status.get('metadata', {}).get('process_type', 'unknown')}") + print("-" * 50) + + # Show recent output + try: + output = read_session_output(session_name, lines=20) + if output['stdout']: + print(f"{Colors.GRAY}Recent stdout:{Colors.RESET}") + for line in output['stdout'].split('\n'): + if line.strip(): + print(f" {line}") + if output['stderr']: + print(f"{Colors.YELLOW}Recent stderr:{Colors.RESET}") + for line in output['stderr'].split('\n'): + if line.strip(): + print(f" {line}") + except Exception as e: + print(f"{Colors.RED}Error reading output: {e}{Colors.RESET}") + + print(f"\n{Colors.CYAN}Session is {'active' if status.get('is_active') else 'inactive'}{Colors.RESET}") + +def detach_session(args): + """Detach from a session (stop showing its output but keep it running).""" + if not args or len(args) < 1: + print(f"{Colors.RED}Usage: detach_session {Colors.RESET}") + return + + session_name = args[0] + mux = get_multiplexer(session_name) + + if not mux: + print(f"{Colors.RED}Session '{session_name}' not found.{Colors.RESET}") + return + + # In this implementation, detaching just means we stop displaying output + # The session continues to run in the background + mux.show_output = False + print(f"{Colors.GREEN}Detached from session '{session_name}'. It continues running in background.{Colors.RESET}") + +def kill_session(args): + """Kill a session forcefully.""" + if not args or len(args) < 1: + print(f"{Colors.RED}Usage: kill_session {Colors.RESET}") + return + + session_name = args[0] + + try: + close_interactive_session(session_name) + print(f"{Colors.GREEN}Session '{session_name}' terminated.{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error terminating session '{session_name}': {e}{Colors.RESET}") + +def send_command(args): + """Send a command to a session.""" + if not args or len(args) < 2: + print(f"{Colors.RED}Usage: send_command {Colors.RESET}") + return + + session_name = args[0] + command = ' '.join(args[1:]) + + try: + send_input_to_session(session_name, command) + print(f"{Colors.GREEN}Sent command to '{session_name}': {command}{Colors.RESET}") + except Exception as e: + print(f"{Colors.RED}Error sending command to '{session_name}': {e}{Colors.RESET}") + +def show_session_log(args): + """Show the full log/output of a session.""" + if not args or len(args) < 1: + print(f"{Colors.RED}Usage: show_session_log {Colors.RESET}") + return + + session_name = args[0] + + try: + output = read_session_output(session_name) # Get all output + print(f"{Colors.BOLD}Full log for session: {session_name}{Colors.RESET}") + print("=" * 80) + + if output['stdout']: + print(f"{Colors.GRAY}STDOUT:{Colors.RESET}") + print(output['stdout']) + print() + + if output['stderr']: + print(f"{Colors.YELLOW}STDERR:{Colors.RESET}") + print(output['stderr']) + print() + + except Exception as e: + print(f"{Colors.RED}Error reading log for '{session_name}': {e}{Colors.RESET}") + +def show_session_status(args): + """Show detailed status of a session.""" + if not args or len(args) < 1: + print(f"{Colors.RED}Usage: show_session_status {Colors.RESET}") + return + + session_name = args[0] + status = get_session_status(session_name) + + if not status: + print(f"{Colors.RED}Session '{session_name}' not found.{Colors.RESET}") + return + + print(f"{Colors.BOLD}Status for session: {session_name}{Colors.RESET}") + print("-" * 50) + + metadata = status.get('metadata', {}) + print(f"Process type: {metadata.get('process_type', 'unknown')}") + print(f"Active: {status.get('is_active', False)}") + + if 'pid' in status: + print(f"PID: {status['pid']}") + + print(f"Start time: {metadata.get('start_time', 0):.1f}") + print(f"Last activity: {metadata.get('last_activity', 0):.1f}") + print(f"Interaction count: {metadata.get('interaction_count', 0)}") + print(f"State: {metadata.get('state', 'unknown')}") + + output_summary = status.get('output_summary', {}) + print(f"Output lines: {output_summary.get('stdout_lines', 0)} stdout, {output_summary.get('stderr_lines', 0)} stderr") + + # Show prompt detection info + detector = get_global_detector() + session_info = detector.get_session_info(session_name) + if session_info: + print(f"Current state: {session_info['current_state']}") + print(f"Is waiting for input: {session_info['is_waiting']}") + +def list_waiting_sessions(args=None): + """List sessions that appear to be waiting for input.""" + sessions = list_active_sessions() + detector = get_global_detector() + + waiting_sessions = [] + for session_name in sessions: + if detector.is_waiting_for_input(session_name): + waiting_sessions.append(session_name) + + if not waiting_sessions: + print(f"{Colors.GREEN}No sessions are currently waiting for input.{Colors.RESET}") + return + + print(f"{Colors.BOLD}Sessions waiting for input:{Colors.RESET}") + for session_name in waiting_sessions: + status = get_session_status(session_name) + if status: + process_type = status.get('metadata', {}).get('process_type', 'unknown') + print(f" {Colors.CYAN}{session_name}{Colors.RESET} ({process_type})") + + # Show suggestions + session_info = detector.get_session_info(session_name) + if session_info: + suggestions = detector.get_response_suggestions({}, process_type) + if suggestions: + print(f" Suggested inputs: {', '.join(suggestions[:3])}") # Show first 3 + print() + +# Command registry for the multiplexer commands +MULTIPLEXER_COMMANDS = { + 'show_sessions': show_sessions, + 'attach_session': attach_session, + 'detach_session': detach_session, + 'kill_session': kill_session, + 'send_command': send_command, + 'show_session_log': show_session_log, + 'show_session_status': show_session_status, + 'list_waiting_sessions': list_waiting_sessions, +} \ No newline at end of file diff --git a/pr/config.py b/pr/config.py index 16267ff..5fd3096 100644 --- a/pr/config.py +++ b/pr/config.py @@ -1,8 +1,8 @@ import os DEFAULT_MODEL = "x-ai/grok-code-fast-1" -DEFAULT_API_URL = "https://openrouter.ai/api/v1/chat/completions" -MODEL_LIST_URL = "https://openrouter.ai/api/v1/models" +DEFAULT_API_URL = "https://static.molodetz.nl/rp.cgi/api/v1/chat/completions" +MODEL_LIST_URL = "https://static.molodetz.nl/rp.cgi/api/v1/models" DB_PATH = os.path.expanduser("~/.assistant_db.sqlite") LOG_FILE = os.path.expanduser("~/.assistant_error.log") @@ -59,3 +59,32 @@ ADVANCED_CONTEXT_ENABLED = True CONTEXT_RELEVANCE_THRESHOLD = 0.3 ADAPTIVE_CONTEXT_MIN = 10 ADAPTIVE_CONTEXT_MAX = 50 + +# Background monitoring and multiplexer configuration +BACKGROUND_MONITOR_ENABLED = True +BACKGROUND_MONITOR_INTERVAL = 5.0 # seconds +AUTONOMOUS_INTERACTION_INTERVAL = 10.0 # seconds +MULTIPLEXER_BUFFER_SIZE = 1000 # lines +MULTIPLEXER_OUTPUT_TIMEOUT = 30 # seconds +MAX_CONCURRENT_SESSIONS = 10 + +# Process-specific timeouts (seconds) +PROCESS_TIMEOUTS = { + 'default': 300, # 5 minutes + 'apt': 600, # 10 minutes + 'ssh': 60, # 1 minute + 'vim': 3600, # 1 hour + 'git': 300, # 5 minutes + 'npm': 600, # 10 minutes + 'pip': 300, # 5 minutes +} + +# Activity thresholds for LLM notification +HIGH_OUTPUT_THRESHOLD = 50 # lines +INACTIVE_THRESHOLD = 300 # seconds +SESSION_NOTIFY_INTERVAL = 60 # seconds + +# Autonomous behavior flags +ENABLE_AUTONOMOUS_SESSIONS = True +ENABLE_BACKGROUND_UPDATES = True +ENABLE_TIMEOUT_DETECTION = True diff --git a/pr/core/assistant.py b/pr/core/assistant.py index c034d07..1e58f69 100644 --- a/pr/core/assistant.py +++ b/pr/core/assistant.py @@ -20,10 +20,16 @@ from pr.tools import ( search_replace,close_editor,create_diff,apply_patch, tail_process, kill_process ) +from pr.tools.interactive_control import ( + start_interactive_session, send_input_to_session, read_session_output, + list_active_sessions, close_interactive_session +) from pr.tools.patch import display_file_diff from pr.tools.filesystem import display_edit_summary, display_edit_timeline, clear_edit_tracker from pr.tools.base import get_tools_definition from pr.commands import handle_command +from pr.core.background_monitor import start_global_monitor, stop_global_monitor, get_global_monitor +from pr.core.autonomous_interactions import start_global_autonomous, stop_global_autonomous, get_global_autonomous logger = logging.getLogger('pr') logger.setLevel(logging.DEBUG) @@ -57,6 +63,7 @@ class Assistant: self.db_conn = None self.autonomous_mode = False self.autonomous_iterations = 0 + self.background_monitoring = False self.init_database() self.messages.append(init_system_message(args)) @@ -69,6 +76,18 @@ class Assistant: logger.warning(f"Could not initialize enhanced features: {e}") self.enhanced = None + # Initialize background monitoring components + try: + start_global_monitor() + autonomous = get_global_autonomous() + autonomous.start(llm_callback=self._handle_background_updates) + self.background_monitoring = True + if self.debug: + logger.debug("Background monitoring initialized") + except Exception as e: + logger.warning(f"Could not initialize background monitoring: {e}") + self.background_monitoring = False + def init_database(self): try: logger.debug(f"Initializing database at {DB_PATH}") @@ -89,6 +108,74 @@ class Assistant: logger.error(f"Database initialization error: {e}") self.db_conn = None + def _handle_background_updates(self, updates): + """Handle background session updates by injecting them into the conversation.""" + if not updates or not updates.get('sessions'): + return + + # Format the update as a system message + update_message = self._format_background_update_message(updates) + + # Inject into current conversation if we're in an active session + if self.messages and len(self.messages) > 0: + self.messages.append({ + "role": "system", + "content": f"Background session updates: {update_message}" + }) + + if self.verbose: + print(f"{Colors.CYAN}Background update: {update_message}{Colors.RESET}") + + def _format_background_update_message(self, updates): + """Format background updates for LLM consumption.""" + session_summaries = [] + + for session_name, session_info in updates.get('sessions', {}).items(): + summary = session_info.get('summary', f'Session {session_name}') + session_summaries.append(f"{session_name}: {summary}") + + if session_summaries: + return "Active background sessions: " + "; ".join(session_summaries) + else: + return "No active background sessions requiring attention." + + def _check_background_updates(self): + """Check for pending background updates and display them.""" + if not self.background_monitoring: + return + + try: + monitor = get_global_monitor() + events = monitor.get_pending_events() + + if events: + print(f"\n{Colors.CYAN}Background Events:{Colors.RESET}") + for event in events: + event_type = event.get('type', 'unknown') + session_name = event.get('session_name', 'unknown') + + if event_type == 'session_started': + print(f" {Colors.GREEN}✓{Colors.RESET} Session '{session_name}' started") + elif event_type == 'session_ended': + print(f" {Colors.YELLOW}✗{Colors.RESET} Session '{session_name}' ended") + elif event_type == 'output_received': + lines = len(event.get('new_output', {}).get('stdout', [])) + print(f" {Colors.BLUE}📝{Colors.RESET} Session '{session_name}' produced {lines} lines of output") + elif event_type == 'possible_input_needed': + print(f" {Colors.RED}❓{Colors.RESET} Session '{session_name}' may need input") + elif event_type == 'high_output_volume': + total = event.get('total_lines', 0) + print(f" {Colors.YELLOW}📊{Colors.RESET} Session '{session_name}' has high output volume ({total} lines)") + elif event_type == 'inactive_session': + inactive_time = event.get('inactive_seconds', 0) + print(f" {Colors.GRAY}⏰{Colors.RESET} Session '{session_name}' inactive for {inactive_time:.0f}s") + + print() # Add blank line after events + + except Exception as e: + if self.debug: + print(f"{Colors.RED}Error checking background updates: {e}{Colors.RESET}") + def execute_tool_calls(self, tool_calls): results = [] @@ -107,7 +194,10 @@ class Assistant: 'run_command': lambda **kw: run_command(**kw), 'tail_process': lambda **kw: tail_process(**kw), 'kill_process': lambda **kw: kill_process(**kw), - 'run_command_interactive': lambda **kw: run_command_interactive(**kw), + 'start_interactive_session': lambda **kw: start_interactive_session(**kw), + 'send_input_to_session': lambda **kw: send_input_to_session(**kw), + 'read_session_output': lambda **kw: read_session_output(**kw), + 'close_interactive_session': lambda **kw: close_interactive_session(**kw), 'read_file': lambda **kw: read_file(**kw, db_conn=self.db_conn), 'write_file': lambda **kw: write_file(**kw, db_conn=self.db_conn), 'list_directory': lambda **kw: list_directory(**kw), @@ -133,6 +223,11 @@ class Assistant: 'display_edit_summary': lambda **kw: display_edit_summary(), 'display_edit_timeline': lambda **kw: display_edit_timeline(**kw), 'clear_edit_tracker': lambda **kw: clear_edit_tracker(), + 'start_interactive_session': lambda **kw: start_interactive_session(**kw), + 'send_input_to_session': lambda **kw: send_input_to_session(**kw), + 'read_session_output': lambda **kw: read_session_output(**kw), + 'list_active_sessions': lambda **kw: list_active_sessions(**kw), + 'close_interactive_session': lambda **kw: close_interactive_session(**kw), 'create_agent': lambda **kw: create_agent(**kw), 'list_agents': lambda **kw: list_agents(**kw), 'execute_agent_task': lambda **kw: execute_agent_task(**kw), @@ -264,7 +359,24 @@ class Assistant: while True: try: - user_input = input(f"{Colors.BLUE}You>{Colors.RESET} ").strip() + # Check for background updates before prompting user + if self.background_monitoring: + self._check_background_updates() + + # Create prompt with background status + prompt = f"{Colors.BLUE}You" + if self.background_monitoring: + try: + from pr.multiplexer import get_all_sessions + sessions = get_all_sessions() + active_count = sum(1 for s in sessions.values() if s.get('status') == 'running') + if active_count > 0: + prompt += f"[{active_count}bg]" + except: + pass + prompt += f">{Colors.RESET} " + + user_input = input(prompt).strip() if not user_input: continue @@ -302,6 +414,14 @@ class Assistant: except Exception as e: logger.error(f"Error cleaning up enhanced features: {e}") + # Stop background monitoring + if self.background_monitoring: + try: + stop_global_autonomous() + stop_global_monitor() + except Exception as e: + logger.error(f"Error stopping background monitoring: {e}") + try: from pr.multiplexer import cleanup_all_multiplexers cleanup_all_multiplexers() diff --git a/pr/core/autonomous_interactions.py b/pr/core/autonomous_interactions.py new file mode 100644 index 0000000..ff2309d --- /dev/null +++ b/pr/core/autonomous_interactions.py @@ -0,0 +1,189 @@ +import time +import threading +from pr.core.background_monitor import get_global_monitor +from pr.tools.interactive_control import list_active_sessions, get_session_status, read_session_output + +class AutonomousInteractions: + def __init__(self, interaction_interval=10.0): + self.interaction_interval = interaction_interval + self.active = False + self.interaction_thread = None + self.llm_callback = None + self.last_check_time = 0 + + def start(self, llm_callback=None): + """Start the autonomous interaction loop.""" + self.llm_callback = llm_callback + if self.interaction_thread is None: + self.active = True + self.interaction_thread = threading.Thread(target=self._interaction_loop, daemon=True) + self.interaction_thread.start() + + def stop(self): + """Stop the autonomous interaction loop.""" + self.active = False + if self.interaction_thread: + self.interaction_thread.join(timeout=2) + + def _interaction_loop(self): + """Main loop for autonomous interactions with background processes.""" + while self.active: + try: + current_time = time.time() + if current_time - self.last_check_time >= self.interaction_interval: + self._check_sessions_and_notify() + self.last_check_time = current_time + + time.sleep(1) # Check every second for shutdown + + except Exception as e: + print(f"Error in autonomous interaction loop: {e}") + time.sleep(self.interaction_interval) + + def _check_sessions_and_notify(self): + """Check active sessions and determine if LLM notification is needed.""" + try: + sessions = list_active_sessions() + + if not sessions: + return # No active sessions + + sessions_needing_attention = self._identify_sessions_needing_attention(sessions) + + if sessions_needing_attention and self.llm_callback: + # Format session updates for LLM + updates = self._format_session_updates(sessions_needing_attention) + self.llm_callback(updates) + + except Exception as e: + print(f"Error checking sessions: {e}") + + def _identify_sessions_needing_attention(self, sessions): + """Identify which sessions need LLM attention based on various criteria.""" + needing_attention = [] + + for session_name, session_data in sessions.items(): + metadata = session_data['metadata'] + output_summary = session_data['output_summary'] + + # Criteria for needing attention: + + # 1. Recent output activity + time_since_activity = time.time() - metadata.get('last_activity', 0) + if time_since_activity < 30: # Activity in last 30 seconds + needing_attention.append(session_name) + continue + + # 2. High output volume (potential completion or error) + total_lines = output_summary['stdout_lines'] + output_summary['stderr_lines'] + if total_lines > 50: # Arbitrary threshold + needing_attention.append(session_name) + continue + + # 3. Long-running sessions that might need intervention + session_age = time.time() - metadata.get('start_time', 0) + if session_age > 300 and time_since_activity > 60: # 5+ minutes old, inactive for 1+ minute + needing_attention.append(session_name) + continue + + # 4. Sessions that appear to be waiting for input + if self._session_looks_stuck(session_name, session_data): + needing_attention.append(session_name) + continue + + return needing_attention + + def _session_looks_stuck(self, session_name, session_data): + """Determine if a session appears to be stuck waiting for input.""" + metadata = session_data['metadata'] + + # Check if process is still running + status = get_session_status(session_name) + if not status or not status.get('is_active', False): + return False + + time_since_activity = time.time() - metadata.get('last_activity', 0) + interaction_count = metadata.get('interaction_count', 0) + + # If running for a while but no interactions, might be waiting + session_age = time.time() - metadata.get('start_time', 0) + if session_age > 60 and interaction_count == 0 and time_since_activity > 30: + return True + + # If had interactions but been quiet for a while + if interaction_count > 0 and time_since_activity > 120: # 2 minutes + return True + + return False + + def _format_session_updates(self, session_names): + """Format session information for LLM consumption.""" + updates = { + 'type': 'background_session_updates', + 'timestamp': time.time(), + 'sessions': {} + } + + for session_name in session_names: + status = get_session_status(session_name) + if status: + # Get recent output (last 20 lines) + try: + recent_output = read_session_output(session_name, lines=20) + except: + recent_output = {'stdout': '', 'stderr': ''} + + updates['sessions'][session_name] = { + 'status': status, + 'recent_output': recent_output, + 'summary': self._create_session_summary(status, recent_output) + } + + return updates + + def _create_session_summary(self, status, recent_output): + """Create a human-readable summary of session status.""" + summary_parts = [] + + process_type = status.get('metadata', {}).get('process_type', 'unknown') + summary_parts.append(f"Type: {process_type}") + + is_active = status.get('is_active', False) + summary_parts.append(f"Status: {'Active' if is_active else 'Inactive'}") + + if is_active and 'pid' in status: + summary_parts.append(f"PID: {status['pid']}") + + age = time.time() - status.get('metadata', {}).get('start_time', 0) + summary_parts.append(f"Age: {age:.1f}s") + + output_lines = len(recent_output.get('stdout', '').split('\n')) + len(recent_output.get('stderr', '').split('\n')) + summary_parts.append(f"Recent output: {output_lines} lines") + + interaction_count = status.get('metadata', {}).get('interaction_count', 0) + summary_parts.append(f"Interactions: {interaction_count}") + + return " | ".join(summary_parts) + +# Global autonomous interactions instance +_global_autonomous = None + +def get_global_autonomous(): + """Get the global autonomous interactions instance.""" + global _global_autonomous + return _global_autonomous + +def start_global_autonomous(llm_callback=None): + """Start global autonomous interactions.""" + global _global_autonomous + if _global_autonomous is None: + _global_autonomous = AutonomousInteractions() + _global_autonomous.start(llm_callback) + return _global_autonomous + +def stop_global_autonomous(): + """Stop global autonomous interactions.""" + global _global_autonomous + if _global_autonomous: + _global_autonomous.stop() + _global_autonomous = None diff --git a/pr/core/background_monitor.py b/pr/core/background_monitor.py new file mode 100644 index 0000000..12fe812 --- /dev/null +++ b/pr/core/background_monitor.py @@ -0,0 +1,236 @@ +import threading +import time +import queue +from pr.multiplexer import get_all_multiplexer_states, get_multiplexer +from pr.tools.interactive_control import get_session_status + +class BackgroundMonitor: + def __init__(self, check_interval=5.0): + self.check_interval = check_interval + self.active = False + self.monitor_thread = None + self.event_queue = queue.Queue() + self.last_states = {} + self.event_callbacks = [] + + def start(self): + """Start the background monitoring thread.""" + if self.monitor_thread is None: + self.active = True + self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) + self.monitor_thread.start() + + def stop(self): + """Stop the background monitoring thread.""" + self.active = False + if self.monitor_thread: + self.monitor_thread.join(timeout=2) + + def add_event_callback(self, callback): + """Add a callback function to be called when events are detected.""" + self.event_callbacks.append(callback) + + def remove_event_callback(self, callback): + """Remove an event callback.""" + if callback in self.event_callbacks: + self.event_callbacks.remove(callback) + + def get_pending_events(self): + """Get all pending events from the queue.""" + events = [] + while not self.event_queue.empty(): + try: + events.append(self.event_queue.get_nowait()) + except queue.Empty: + break + return events + + def _monitor_loop(self): + """Main monitoring loop that checks for multiplexer activity.""" + while self.active: + try: + current_states = get_all_multiplexer_states() + + # Detect changes and events + events = self._detect_events(self.last_states, current_states) + + # Queue events for processing + for event in events: + self.event_queue.put(event) + # Also call callbacks immediately + for callback in self.event_callbacks: + try: + callback(event) + except Exception as e: + print(f"Error in event callback: {e}") + + self.last_states = current_states.copy() + time.sleep(self.check_interval) + + except Exception as e: + print(f"Error in background monitor loop: {e}") + time.sleep(self.check_interval) + + def _detect_events(self, old_states, new_states): + """Detect significant events in multiplexer states.""" + events = [] + + # Check for new sessions + for session_name in new_states: + if session_name not in old_states: + events.append({ + 'type': 'session_started', + 'session_name': session_name, + 'metadata': new_states[session_name]['metadata'] + }) + + # Check for ended sessions + for session_name in old_states: + if session_name not in new_states: + events.append({ + 'type': 'session_ended', + 'session_name': session_name + }) + + # Check for activity in existing sessions + for session_name, new_state in new_states.items(): + if session_name in old_states: + old_state = old_states[session_name] + + # Check for output changes + old_stdout_lines = old_state['output_summary']['stdout_lines'] + new_stdout_lines = new_state['output_summary']['stdout_lines'] + old_stderr_lines = old_state['output_summary']['stderr_lines'] + new_stderr_lines = new_state['output_summary']['stderr_lines'] + + if new_stdout_lines > old_stdout_lines or new_stderr_lines > old_stderr_lines: + # Get the new output + mux = get_multiplexer(session_name) + if mux: + all_output = mux.get_all_output() + new_output = { + 'stdout': all_output['stdout'].split('\n')[old_stdout_lines:], + 'stderr': all_output['stderr'].split('\n')[old_stderr_lines:] + } + + events.append({ + 'type': 'output_received', + 'session_name': session_name, + 'new_output': new_output, + 'total_lines': { + 'stdout': new_stdout_lines, + 'stderr': new_stderr_lines + } + }) + + # Check for state changes + old_metadata = old_state['metadata'] + new_metadata = new_state['metadata'] + + if old_metadata.get('state') != new_metadata.get('state'): + events.append({ + 'type': 'state_changed', + 'session_name': session_name, + 'old_state': old_metadata.get('state'), + 'new_state': new_metadata.get('state') + }) + + # Check for process type identification + if (old_metadata.get('process_type') == 'unknown' and + new_metadata.get('process_type') != 'unknown'): + events.append({ + 'type': 'process_identified', + 'session_name': session_name, + 'process_type': new_metadata.get('process_type') + }) + + # Check for sessions needing attention (based on heuristics) + for session_name, state in new_states.items(): + metadata = state['metadata'] + output_summary = state['output_summary'] + + # Heuristic: High output volume might indicate completion or error + total_lines = output_summary['stdout_lines'] + output_summary['stderr_lines'] + if total_lines > 100: # Arbitrary threshold + events.append({ + 'type': 'high_output_volume', + 'session_name': session_name, + 'total_lines': total_lines + }) + + # Heuristic: Long-running session without recent activity + time_since_activity = time.time() - metadata.get('last_activity', 0) + if time_since_activity > 300: # 5 minutes + events.append({ + 'type': 'inactive_session', + 'session_name': session_name, + 'inactive_seconds': time_since_activity + }) + + # Heuristic: Sessions that might be waiting for input + # This would be enhanced with prompt detection in later phases + if self._might_be_waiting_for_input(session_name, state): + events.append({ + 'type': 'possible_input_needed', + 'session_name': session_name + }) + + return events + + def _might_be_waiting_for_input(self, session_name, state): + """Heuristic to detect if a session might be waiting for input.""" + metadata = state['metadata'] + process_type = metadata.get('process_type', 'unknown') + + # Simple heuristics based on process type and recent activity + time_since_activity = time.time() - metadata.get('last_activity', 0) + + # If it's been more than 10 seconds since last activity, might be waiting + if time_since_activity > 10: + return True + + return False + +# Global monitor instance +_global_monitor = None + +def get_global_monitor(): + """Get the global background monitor instance.""" + global _global_monitor + if _global_monitor is None: + _global_monitor = BackgroundMonitor() + return _global_monitor + +def start_global_monitor(): + """Start the global background monitor.""" + monitor = get_global_monitor() + monitor.start() + +def stop_global_monitor(): + """Stop the global background monitor.""" + global _global_monitor + if _global_monitor: + _global_monitor.stop() + +# Global monitor instance +_global_monitor = None + +def start_global_monitor(): + """Start the global background monitor.""" + global _global_monitor + if _global_monitor is None: + _global_monitor = BackgroundMonitor() + _global_monitor.start() + return _global_monitor + +def stop_global_monitor(): + """Stop the global background monitor.""" + global _global_monitor + if _global_monitor: + _global_monitor.stop() + _global_monitor = None + +def get_global_monitor(): + """Get the global background monitor instance.""" + global _global_monitor + return _global_monitor diff --git a/pr/multiplexer.py b/pr/multiplexer.py index 9a921a1..7f50afa 100644 --- a/pr/multiplexer.py +++ b/pr/multiplexer.py @@ -2,7 +2,13 @@ import threading import queue import time import sys +import subprocess +import signal +import os from pr.ui import Colors +from collections import defaultdict +from pr.tools.process_handlers import get_handler_for_process, detect_process_type +from pr.tools.prompt_detection import get_global_detector class TerminalMultiplexer: def __init__(self, name, show_output=True): @@ -14,6 +20,15 @@ class TerminalMultiplexer: self.stderr_queue = queue.Queue() self.active = True self.lock = threading.Lock() + self.metadata = { + 'start_time': time.time(), + 'last_activity': time.time(), + 'interaction_count': 0, + 'process_type': 'unknown', + 'state': 'active' + } + self.handler = None + self.prompt_detector = get_global_detector() if self.show_output: self.display_thread = threading.Thread(target=self._display_worker, daemon=True) @@ -40,12 +55,24 @@ class TerminalMultiplexer: def write_stdout(self, data): with self.lock: self.stdout_buffer.append(data) + self.metadata['last_activity'] = time.time() + # Update handler state if available + if self.handler: + self.handler.update_state(data) + # Update prompt detector + self.prompt_detector.update_session_state(self.name, data, self.metadata['process_type']) if self.show_output: self.stdout_queue.put(data) def write_stderr(self, data): with self.lock: self.stderr_buffer.append(data) + self.metadata['last_activity'] = time.time() + # Update handler state if available + if self.handler: + self.handler.update_state(data) + # Update prompt detector + self.prompt_detector.update_session_state(self.name, data, self.metadata['process_type']) if self.show_output: self.stderr_queue.put(data) @@ -64,6 +91,37 @@ class TerminalMultiplexer: 'stderr': ''.join(self.stderr_buffer) } + def get_metadata(self): + with self.lock: + return self.metadata.copy() + + def update_metadata(self, key, value): + with self.lock: + self.metadata[key] = value + + def set_process_type(self, process_type): + """Set the process type and initialize appropriate handler.""" + with self.lock: + self.metadata['process_type'] = process_type + self.handler = get_handler_for_process(process_type, self) + + def send_input(self, input_data): + if hasattr(self, 'process') and self.process.poll() is None: + try: + self.process.stdin.write(input_data + '\n') + self.process.stdin.flush() + with self.lock: + self.metadata['last_activity'] = time.time() + self.metadata['interaction_count'] += 1 + except Exception as e: + self.write_stderr(f"Error sending input: {e}") + else: + # This will be implemented when we have a process attached + # For now, just update activity + with self.lock: + self.metadata['last_activity'] = time.time() + self.metadata['interaction_count'] += 1 + def close(self): self.active = False if hasattr(self, 'display_thread'): @@ -72,6 +130,9 @@ class TerminalMultiplexer: _multiplexers = {} _mux_counter = 0 _mux_lock = threading.Lock() +_background_monitor = None +_monitor_active = False +_monitor_interval = 0.2 # 200ms def create_multiplexer(name=None, show_output=True): global _mux_counter @@ -92,7 +153,201 @@ def close_multiplexer(name): mux.close() del _multiplexers[name] +def get_all_multiplexer_states(): + with _mux_lock: + states = {} + for name, mux in _multiplexers.items(): + states[name] = { + 'metadata': mux.get_metadata(), + 'output_summary': { + 'stdout_lines': len(mux.stdout_buffer), + 'stderr_lines': len(mux.stderr_buffer) + } + } + return states + def cleanup_all_multiplexers(): for mux in list(_multiplexers.values()): mux.close() _multiplexers.clear() + +# Background process management +_background_processes = {} +_process_lock = threading.Lock() + +class BackgroundProcess: + def __init__(self, name, command): + self.name = name + self.command = command + self.process = None + self.multiplexer = None + self.status = 'starting' + self.start_time = time.time() + self.end_time = None + + def start(self): + """Start the background process.""" + try: + # Create multiplexer for this process + mux_name, mux = create_multiplexer(self.name, show_output=False) + self.multiplexer = mux + + # Detect process type + process_type = detect_process_type(self.command) + mux.set_process_type(process_type) + + # Start the subprocess + self.process = subprocess.Popen( + self.command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True + ) + + self.status = 'running' + + # Start output monitoring threads + threading.Thread(target=self._monitor_stdout, daemon=True).start() + threading.Thread(target=self._monitor_stderr, daemon=True).start() + + return {'status': 'success', 'pid': self.process.pid} + + except Exception as e: + self.status = 'error' + return {'status': 'error', 'error': str(e)} + + def _monitor_stdout(self): + """Monitor stdout from the process.""" + try: + for line in iter(self.process.stdout.readline, ''): + if line: + self.multiplexer.write_stdout(line.rstrip('\n\r')) + except Exception as e: + self.write_stderr(f"Error reading stdout: {e}") + finally: + self._check_completion() + + def _monitor_stderr(self): + """Monitor stderr from the process.""" + try: + for line in iter(self.process.stderr.readline, ''): + if line: + self.multiplexer.write_stderr(line.rstrip('\n\r')) + except Exception as e: + self.write_stderr(f"Error reading stderr: {e}") + + def _check_completion(self): + """Check if process has completed.""" + if self.process and self.process.poll() is not None: + self.status = 'completed' + self.end_time = time.time() + + def get_info(self): + """Get process information.""" + self._check_completion() + return { + 'name': self.name, + 'command': self.command, + 'status': self.status, + 'pid': self.process.pid if self.process else None, + 'start_time': self.start_time, + 'end_time': self.end_time, + 'runtime': time.time() - self.start_time if not self.end_time else self.end_time - self.start_time + } + + def get_output(self, lines=None): + """Get process output.""" + if not self.multiplexer: + return [] + + all_output = self.multiplexer.get_all_output() + stdout_lines = all_output['stdout'].split('\n') if all_output['stdout'] else [] + stderr_lines = all_output['stderr'].split('\n') if all_output['stderr'] else [] + + combined = stdout_lines + stderr_lines + if lines: + combined = combined[-lines:] + + return [line for line in combined if line.strip()] + + def send_input(self, input_text): + """Send input to the process.""" + if self.process and self.status == 'running': + try: + self.process.stdin.write(input_text + '\n') + self.process.stdin.flush() + return {'status': 'success'} + except Exception as e: + return {'status': 'error', 'error': str(e)} + return {'status': 'error', 'error': 'Process not running or no stdin'} + + def kill(self): + """Kill the process.""" + if self.process and self.status == 'running': + try: + self.process.terminate() + # Wait a bit for graceful termination + time.sleep(0.1) + if self.process.poll() is None: + self.process.kill() + self.status = 'killed' + self.end_time = time.time() + return {'status': 'success'} + except Exception as e: + return {'status': 'error', 'error': str(e)} + return {'status': 'error', 'error': 'Process not running'} + +def start_background_process(name, command): + """Start a background process.""" + with _process_lock: + if name in _background_processes: + return {'status': 'error', 'error': f'Process {name} already exists'} + + process = BackgroundProcess(name, command) + result = process.start() + + if result['status'] == 'success': + _background_processes[name] = process + + return result + +def get_all_sessions(): + """Get all background process sessions.""" + with _process_lock: + sessions = {} + for name, process in _background_processes.items(): + sessions[name] = process.get_info() + return sessions + +def get_session_info(name): + """Get information about a specific session.""" + with _process_lock: + process = _background_processes.get(name) + return process.get_info() if process else None + +def get_session_output(name, lines=None): + """Get output from a specific session.""" + with _process_lock: + process = _background_processes.get(name) + return process.get_output(lines) if process else None + +def send_input_to_session(name, input_text): + """Send input to a background session.""" + with _process_lock: + process = _background_processes.get(name) + return process.send_input(input_text) if process else {'status': 'error', 'error': 'Session not found'} + +def kill_session(name): + """Kill a background session.""" + with _process_lock: + process = _background_processes.get(name) + if process: + result = process.kill() + if result['status'] == 'success': + del _background_processes[name] + return result + return {'status': 'error', 'error': 'Session not found'} diff --git a/pr/tools/base.py b/pr/tools/base.py index 9f5eddf..2c50fab 100644 --- a/pr/tools/base.py +++ b/pr/tools/base.py @@ -72,8 +72,8 @@ def get_tools_definition(): { "type": "function", "function": { - "name": "run_command_interactive", - "description": "Execute an interactive terminal command that requires user input or displays UI. The command runs in the user's terminal. Returns exit code only.", + "name": "start_interactive_session", + "description": "Execute an interactive terminal command that requires user input or displays UI. The command runs in a dedicated session and returns a session name.", "parameters": { "type": "object", "properties": { @@ -83,6 +83,49 @@ def get_tools_definition(): } } }, + { + "type": "function", + "function": { + "name": "send_input_to_session", + "description": "Send input to an interactive session.", + "parameters": { + "type": "object", + "properties": { + "session_name": {"type": "string", "description": "The name of the session"}, + "input_data": {"type": "string", "description": "The input to send to the session"} + }, + "required": ["session_name", "input_data"] + } + } + }, + { + "type": "function", + "function": { + "name": "read_session_output", + "description": "Read output from an interactive session.", + "parameters": { + "type": "object", + "properties": { + "session_name": {"type": "string", "description": "The name of the session"} + }, + "required": ["session_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "close_interactive_session", + "description": "Close an interactive session.", + "parameters": { + "type": "object", + "properties": { + "session_name": {"type": "string", "description": "The name of the session"} + }, + "required": ["session_name"] + } + } + }, { "type": "function", "function": { diff --git a/pr/tools/command.py b/pr/tools/command.py index 5eb4f0c..237e551 100644 --- a/pr/tools/command.py +++ b/pr/tools/command.py @@ -3,6 +3,8 @@ import subprocess import time import select from pr.multiplexer import create_multiplexer, close_multiplexer, get_multiplexer +from pr.tools.interactive_control import start_interactive_session +from pr.config import MAX_CONCURRENT_SESSIONS _processes = {} @@ -95,7 +97,7 @@ def tail_process(pid: int, timeout: int = 30): return {"status": "error", "error": f"Process {pid} not found"} -def run_command(command, timeout=30): +def run_command(command, timeout=30, monitored=False): mux_name = None try: process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) diff --git a/pr/tools/interactive_control.py b/pr/tools/interactive_control.py new file mode 100644 index 0000000..f11bebf --- /dev/null +++ b/pr/tools/interactive_control.py @@ -0,0 +1,157 @@ +import subprocess +import threading +import time +from pr.multiplexer import create_multiplexer, get_multiplexer, close_multiplexer, get_all_multiplexer_states + +def start_interactive_session(command, session_name=None, process_type='generic'): + """ + Start an interactive session in a dedicated multiplexer. + + Args: + command: The command to run (list or string) + session_name: Optional name for the session + process_type: Type of process (ssh, vim, apt, etc.) + + Returns: + session_name: The name of the created session + """ + name, mux = create_multiplexer(session_name) + mux.update_metadata('process_type', process_type) + + # Start the process + if isinstance(command, str): + command = command.split() + + try: + process = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1 + ) + + mux.process = process + mux.update_metadata('pid', process.pid) + + # Set process type and handler + detected_type = detect_process_type(command) + mux.set_process_type(detected_type) + + # Start output readers + stdout_thread = threading.Thread(target=_read_output, args=(process.stdout, mux.write_stdout), daemon=True) + stderr_thread = threading.Thread(target=_read_output, args=(process.stderr, mux.write_stderr), daemon=True) + + stdout_thread.start() + stderr_thread.start() + + mux.stdout_thread = stdout_thread + mux.stderr_thread = stderr_thread + + return name + except Exception as e: + close_multiplexer(name) + raise e + +def _read_output(stream, write_func): + """Read from a stream and write to multiplexer buffer.""" + try: + for line in iter(stream.readline, ''): + if line: + write_func(line.rstrip('\n')) + except Exception as e: + print(f"Error reading output: {e}") + +def send_input_to_session(session_name, input_data): + """ + Send input to an interactive session. + + Args: + session_name: Name of the session + input_data: Input string to send + """ + mux = get_multiplexer(session_name) + if not mux: + raise ValueError(f"Session {session_name} not found") + + if not hasattr(mux, 'process') or mux.process.poll() is not None: + raise ValueError(f"Session {session_name} is not active") + + try: + mux.process.stdin.write(input_data + '\n') + mux.process.stdin.flush() + except Exception as e: + raise RuntimeError(f"Failed to send input to session {session_name}: {e}") + +def read_session_output(session_name, lines=None): + """ + Read output from a session. + + Args: + session_name: Name of the session + lines: Number of recent lines to return (None for all) + + Returns: + dict: {'stdout': str, 'stderr': str} + """ + mux = get_multiplexer(session_name) + if not mux: + raise ValueError(f"Session {session_name} not found") + + output = mux.get_all_output() + if lines is not None: + # Return last N lines + stdout_lines = output['stdout'].split('\n')[-lines:] if output['stdout'] else [] + stderr_lines = output['stderr'].split('\n')[-lines:] if output['stderr'] else [] + output = { + 'stdout': '\n'.join(stdout_lines), + 'stderr': '\n'.join(stderr_lines) + } + return output + +def list_active_sessions(): + """ + List all active interactive sessions. + + Returns: + dict: Session states + """ + return get_all_multiplexer_states() + +def get_session_status(session_name): + """ + Get detailed status of a session. + + Args: + session_name: Name of the session + + Returns: + dict: Session metadata and status + """ + mux = get_multiplexer(session_name) + if not mux: + return None + + status = mux.get_metadata() + status['is_active'] = hasattr(mux, 'process') and mux.process.poll() is None + if status['is_active']: + status['pid'] = mux.process.pid + status['output_summary'] = { + 'stdout_lines': len(mux.stdout_buffer), + 'stderr_lines': len(mux.stderr_buffer) + } + return status + +def close_interactive_session(session_name): + """ + Close an interactive session. + """ + try: + mux = get_multiplexer(session_name) + if mux: + mux.process.kill() + close_multiplexer(session_name) + return {"status": "success"} + except Exception as e: + return {"status": "error", "error": str(e)} diff --git a/pr/tools/process_handlers.py b/pr/tools/process_handlers.py new file mode 100644 index 0000000..639b12a --- /dev/null +++ b/pr/tools/process_handlers.py @@ -0,0 +1,264 @@ +import re +import time +from abc import ABC, abstractmethod + +class ProcessHandler(ABC): + """Base class for process-specific handlers.""" + + def __init__(self, multiplexer): + self.multiplexer = multiplexer + self.state_machine = {} + self.current_state = 'initial' + self.prompt_patterns = [] + self.response_suggestions = {} + + @abstractmethod + def get_process_type(self): + """Return the process type this handler manages.""" + pass + + def update_state(self, output): + """Update internal state based on output.""" + pass + + def get_prompt_suggestions(self): + """Return suggested responses for current state.""" + return self.response_suggestions.get(self.current_state, []) + + def is_waiting_for_input(self): + """Check if process appears to be waiting for input.""" + return self.current_state in ['waiting_confirmation', 'waiting_input'] + +class AptHandler(ProcessHandler): + """Handler for apt package manager interactions.""" + + def __init__(self, multiplexer): + super().__init__(multiplexer) + self.state_machine = { + 'initial': ['running_command'], + 'running_command': ['waiting_confirmation', 'completed'], + 'waiting_confirmation': ['confirmed', 'cancelled'], + 'confirmed': ['installing', 'completed'], + 'installing': ['completed', 'error'], + 'completed': [], + 'error': [], + 'cancelled': [] + } + self.prompt_patterns = [ + (r'Do you want to continue\?', 'confirmation'), + (r'After this operation.*installed\.', 'size_info'), + (r'Need to get.*B of archives\.', 'download_info'), + (r'Unpacking.*Configuring', 'configuring'), + (r'Setting up', 'setting_up'), + (r'E:\s', 'error') + ] + + def get_process_type(self): + return 'apt' + + def update_state(self, output): + """Update state based on apt output patterns.""" + output_lower = output.lower() + + # Check for completion + if 'processing triggers' in output_lower or 'done' in output_lower: + self.current_state = 'completed' + # Check for confirmation prompts + elif 'do you want to continue' in output_lower: + self.current_state = 'waiting_confirmation' + # Check for installation progress + elif 'setting up' in output_lower or 'unpacking' in output_lower: + self.current_state = 'installing' + # Check for errors + elif 'e:' in output_lower or 'error' in output_lower: + self.current_state = 'error' + + def get_prompt_suggestions(self): + """Return suggested responses for apt prompts.""" + suggestions = super().get_prompt_suggestions() + if self.current_state == 'waiting_confirmation': + suggestions.extend(['y', 'yes', 'n', 'no']) + return suggestions + +class VimHandler(ProcessHandler): + """Handler for vim editor interactions.""" + + def __init__(self, multiplexer): + super().__init__(multiplexer) + self.state_machine = { + 'initial': ['normal_mode', 'insert_mode'], + 'normal_mode': ['insert_mode', 'command_mode', 'visual_mode'], + 'insert_mode': ['normal_mode'], + 'command_mode': ['normal_mode'], + 'visual_mode': ['normal_mode'], + 'exiting': [] + } + self.prompt_patterns = [ + (r'-- INSERT --', 'insert_mode'), + (r'-- VISUAL --', 'visual_mode'), + (r':', 'command_mode'), + (r'Press ENTER', 'waiting_enter'), + (r'Saved', 'saved') + ] + self.mode_indicators = { + 'insert': '-- INSERT --', + 'visual': '-- VISUAL --', + 'command': ':' + } + + def get_process_type(self): + return 'vim' + + def update_state(self, output): + """Update state based on vim mode indicators.""" + if '-- INSERT --' in output: + self.current_state = 'insert_mode' + elif '-- VISUAL --' in output: + self.current_state = 'visual_mode' + elif output.strip().endswith(':'): + self.current_state = 'command_mode' + elif 'Press ENTER' in output: + self.current_state = 'waiting_enter' + else: + # Default to normal mode if no specific indicators + self.current_state = 'normal_mode' + + def get_prompt_suggestions(self): + """Return suggested commands for vim modes.""" + suggestions = super().get_prompt_suggestions() + if self.current_state == 'command_mode': + suggestions.extend(['w', 'q', 'wq', 'q!', 'w!']) + elif self.current_state == 'normal_mode': + suggestions.extend(['i', 'a', 'o', 'dd', ':w', ':q']) + elif self.current_state == 'waiting_enter': + suggestions.extend(['\n']) + return suggestions + +class SSHHandler(ProcessHandler): + """Handler for SSH connection interactions.""" + + def __init__(self, multiplexer): + super().__init__(multiplexer) + self.state_machine = { + 'initial': ['connecting'], + 'connecting': ['auth_prompt', 'connected', 'failed'], + 'auth_prompt': ['connected', 'failed'], + 'connected': ['shell', 'disconnected'], + 'shell': ['disconnected'], + 'failed': [], + 'disconnected': [] + } + self.prompt_patterns = [ + (r'password:', 'password_prompt'), + (r'yes/no', 'host_key_prompt'), + (r'Permission denied', 'auth_failed'), + (r'Welcome to', 'connected'), + (r'\$', 'shell_prompt'), + (r'\#', 'root_shell_prompt'), + (r'Connection closed', 'disconnected') + ] + + def get_process_type(self): + return 'ssh' + + def update_state(self, output): + """Update state based on SSH connection output.""" + output_lower = output.lower() + + if 'permission denied' in output_lower: + self.current_state = 'failed' + elif 'password:' in output_lower: + self.current_state = 'auth_prompt' + elif 'yes/no' in output_lower: + self.current_state = 'auth_prompt' + elif 'welcome to' in output_lower or 'last login' in output_lower: + self.current_state = 'connected' + elif output.strip().endswith('$') or output.strip().endswith('#'): + self.current_state = 'shell' + elif 'connection closed' in output_lower: + self.current_state = 'disconnected' + + def get_prompt_suggestions(self): + """Return suggested responses for SSH prompts.""" + suggestions = super().get_prompt_suggestions() + if self.current_state == 'auth_prompt': + if 'password:' in self.multiplexer.get_all_output()['stdout']: + suggestions.extend(['']) # Placeholder for actual password + elif 'yes/no' in self.multiplexer.get_all_output()['stdout']: + suggestions.extend(['yes', 'no']) + return suggestions + +class GenericProcessHandler(ProcessHandler): + """Fallback handler for unknown process types.""" + + def __init__(self, multiplexer): + super().__init__(multiplexer) + self.state_machine = { + 'initial': ['running'], + 'running': ['waiting_input', 'completed'], + 'waiting_input': ['running'], + 'completed': [] + } + self.prompt_patterns = [ + (r'\?\s*$', 'waiting_input'), # Lines ending with ? + (r'>\s*$', 'waiting_input'), # Lines ending with > + (r':\s*$', 'waiting_input'), # Lines ending with : + (r'done', 'completed'), + (r'finished', 'completed'), + (r'exit code', 'completed') + ] + + def get_process_type(self): + return 'generic' + + def update_state(self, output): + """Basic state detection for generic processes.""" + output_lower = output.lower() + + if any(pattern in output_lower for pattern in ['done', 'finished', 'complete']): + self.current_state = 'completed' + elif any(output.strip().endswith(char) for char in ['?', '>', ':']): + self.current_state = 'waiting_input' + else: + self.current_state = 'running' + +# Handler registry +_handler_classes = { + 'apt': AptHandler, + 'vim': VimHandler, + 'ssh': SSHHandler, + 'generic': GenericProcessHandler +} + +def get_handler_for_process(process_type, multiplexer): + """Get appropriate handler for a process type.""" + handler_class = _handler_classes.get(process_type, GenericProcessHandler) + return handler_class(multiplexer) + +def detect_process_type(command): + """Detect process type from command.""" + command_str = ' '.join(command) if isinstance(command, list) else command + command_lower = command_str.lower() + + if 'apt' in command_lower or 'apt-get' in command_lower: + return 'apt' + elif 'vim' in command_lower or 'vi ' in command_lower: + return 'vim' + elif 'ssh' in command_lower: + return 'ssh' + else: + return 'generic' + return 'ssh' +def detect_process_type(command): + """Detect process type from command.""" + command_str = ' '.join(command) if isinstance(command, list) else command + command_lower = command_str.lower() + + if 'apt' in command_lower or 'apt-get' in command_lower: + return 'apt' + elif 'vim' in command_lower or 'vi ' in command_lower: + return 'vim' + elif 'ssh' in command_lower: + return 'ssh' + else: + return 'generic' diff --git a/pr/tools/prompt_detection.py b/pr/tools/prompt_detection.py new file mode 100644 index 0000000..7ad9077 --- /dev/null +++ b/pr/tools/prompt_detection.py @@ -0,0 +1,278 @@ +import re +import time +from collections import defaultdict + +class PromptDetector: + """Detects various process prompts and manages interaction state.""" + + def __init__(self): + self.prompt_patterns = self._load_prompt_patterns() + self.state_machines = self._load_state_machines() + self.session_states = {} + self.timeout_configs = { + 'default': 30, # 30 seconds default timeout + 'apt': 300, # 5 minutes for apt operations + 'ssh': 60, # 1 minute for SSH connections + 'vim': 3600 # 1 hour for vim sessions + } + + def _load_prompt_patterns(self): + """Load regex patterns for detecting various prompts.""" + return { + 'bash_prompt': [ + re.compile(r'[\w\-\.]+@[\w\-\.]+:.*[\$#]\s*$'), + re.compile(r'\$\s*$'), + re.compile(r'#\s*$'), + re.compile(r'>\s*$') # Continuation prompt + ], + 'confirmation': [ + re.compile(r'[Yy]/[Nn]', re.IGNORECASE), + re.compile(r'[Yy]es/[Nn]o', re.IGNORECASE), + re.compile(r'continue\?', re.IGNORECASE), + re.compile(r'proceed\?', re.IGNORECASE) + ], + 'password': [ + re.compile(r'password:', re.IGNORECASE), + re.compile(r'passphrase:', re.IGNORECASE), + re.compile(r'enter password', re.IGNORECASE) + ], + 'sudo_password': [ + re.compile(r'\[sudo\].*password', re.IGNORECASE) + ], + 'apt': [ + re.compile(r'Do you want to continue\?', re.IGNORECASE), + re.compile(r'After this operation', re.IGNORECASE), + re.compile(r'Need to get', re.IGNORECASE) + ], + 'vim': [ + re.compile(r'-- INSERT --'), + re.compile(r'-- VISUAL --'), + re.compile(r':'), + re.compile(r'Press ENTER', re.IGNORECASE) + ], + 'ssh': [ + re.compile(r'yes/no', re.IGNORECASE), + re.compile(r'password:', re.IGNORECASE), + re.compile(r'Permission denied', re.IGNORECASE) + ], + 'git': [ + re.compile(r'Username:', re.IGNORECASE), + re.compile(r'Email:', re.IGNORECASE) + ], + 'error': [ + re.compile(r'error:', re.IGNORECASE), + re.compile(r'failed', re.IGNORECASE), + re.compile(r'exception', re.IGNORECASE) + ] + } + + def _load_state_machines(self): + """Load state machines for different process types.""" + return { + 'apt': { + 'states': ['initial', 'running', 'confirming', 'installing', 'completed', 'error'], + 'transitions': { + 'initial': ['running'], + 'running': ['confirming', 'installing', 'completed', 'error'], + 'confirming': ['installing', 'cancelled'], + 'installing': ['completed', 'error'], + 'completed': [], + 'error': [], + 'cancelled': [] + } + }, + 'ssh': { + 'states': ['initial', 'connecting', 'authenticating', 'connected', 'error'], + 'transitions': { + 'initial': ['connecting'], + 'connecting': ['authenticating', 'connected', 'error'], + 'authenticating': ['connected', 'error'], + 'connected': ['error'], + 'error': [] + } + }, + 'vim': { + 'states': ['initial', 'normal', 'insert', 'visual', 'command', 'exiting'], + 'transitions': { + 'initial': ['normal', 'insert'], + 'normal': ['insert', 'visual', 'command', 'exiting'], + 'insert': ['normal'], + 'visual': ['normal'], + 'command': ['normal', 'exiting'], + 'exiting': [] + } + } + } + + def detect_prompt(self, output, process_type='generic'): + """Detect what type of prompt is present in the output.""" + detections = {} + + # Check all pattern categories + for category, patterns in self.prompt_patterns.items(): + for pattern in patterns: + if pattern.search(output): + if category not in detections: + detections[category] = [] + detections[category].append(pattern.pattern) + + # Process type specific detection + if process_type in self.prompt_patterns: + for pattern in self.prompt_patterns[process_type]: + if pattern.search(output): + detections[process_type] = detections.get(process_type, []) + detections[process_type].append(pattern.pattern) + + return detections + + def get_response_suggestions(self, prompt_detections, process_type='generic'): + """Get suggested responses based on detected prompts.""" + suggestions = [] + + for category, patterns in prompt_detections.items(): + if category == 'confirmation': + suggestions.extend(['y', 'yes', 'n', 'no']) + elif category == 'password': + suggestions.append('') + elif category == 'sudo_password': + suggestions.append('') + elif category == 'apt': + if any('continue' in p for p in patterns): + suggestions.extend(['y', 'yes']) + elif category == 'vim': + if any(':' in p for p in patterns): + suggestions.extend(['w', 'q', 'wq', 'q!']) + elif any('ENTER' in p for p in patterns): + suggestions.append('\n') + elif category == 'ssh': + if any('yes/no' in p for p in patterns): + suggestions.extend(['yes', 'no']) + elif any('password' in p for p in patterns): + suggestions.append('') + elif category == 'bash_prompt': + suggestions.extend(['help', 'ls', 'pwd', 'exit']) + + return list(set(suggestions)) # Remove duplicates + + def update_session_state(self, session_name, output, process_type='generic'): + """Update the state machine for a session based on output.""" + if session_name not in self.session_states: + self.session_states[session_name] = { + 'current_state': 'initial', + 'process_type': process_type, + 'last_activity': time.time(), + 'transitions': [] + } + + session_state = self.session_states[session_name] + old_state = session_state['current_state'] + + # Detect prompts and determine new state + detections = self.detect_prompt(output, process_type) + new_state = self._determine_state_from_detections(detections, process_type, old_state) + + if new_state != old_state: + session_state['transitions'].append({ + 'from': old_state, + 'to': new_state, + 'timestamp': time.time(), + 'trigger': detections + }) + session_state['current_state'] = new_state + + session_state['last_activity'] = time.time() + return new_state + + def _determine_state_from_detections(self, detections, process_type, current_state): + """Determine new state based on prompt detections.""" + if process_type in self.state_machines: + state_machine = self.state_machines[process_type] + + # State transition logic based on detections + if 'confirmation' in detections and current_state in ['running', 'initial']: + return 'confirming' + elif 'password' in detections or 'sudo_password' in detections: + return 'authenticating' + elif 'error' in detections: + return 'error' + elif 'bash_prompt' in detections and current_state != 'initial': + return 'connected' if process_type == 'ssh' else 'completed' + elif 'vim' in detections: + if any('-- INSERT --' in p for p in detections.get('vim', [])): + return 'insert' + elif any('-- VISUAL --' in p for p in detections.get('vim', [])): + return 'visual' + elif any(':' in p for p in detections.get('vim', [])): + return 'command' + + # Default state progression + if current_state == 'initial': + return 'running' + elif current_state == 'running' and detections: + return 'waiting_input' + elif current_state == 'waiting_input' and not detections: + return 'running' + + return current_state + + def is_waiting_for_input(self, session_name): + """Check if a session is currently waiting for input.""" + if session_name not in self.session_states: + return False + + state = self.session_states[session_name]['current_state'] + process_type = self.session_states[session_name]['process_type'] + + # States that typically indicate waiting for input + waiting_states = { + 'generic': ['waiting_input'], + 'apt': ['confirming'], + 'ssh': ['authenticating'], + 'vim': ['command', 'insert', 'visual'] + } + + return state in waiting_states.get(process_type, []) + + def get_session_timeout(self, session_name): + """Get the timeout for a session based on its process type.""" + if session_name not in self.session_states: + return self.timeout_configs['default'] + + process_type = self.session_states[session_name]['process_type'] + return self.timeout_configs.get(process_type, self.timeout_configs['default']) + + def check_for_timeouts(self): + """Check all sessions for timeouts and return timed out sessions.""" + timed_out = [] + current_time = time.time() + + for session_name, state in self.session_states.items(): + timeout = self.get_session_timeout(session_name) + if current_time - state['last_activity'] > timeout: + timed_out.append(session_name) + + return timed_out + + def get_session_info(self, session_name): + """Get information about a session's state.""" + if session_name not in self.session_states: + return None + + state = self.session_states[session_name] + return { + 'current_state': state['current_state'], + 'process_type': state['process_type'], + 'last_activity': state['last_activity'], + 'transitions': state['transitions'][-5:], # Last 5 transitions + 'is_waiting': self.is_waiting_for_input(session_name) + } + +# Global detector instance +_detector = None + +def get_global_detector(): + """Get the global prompt detector instance.""" + global _detector + if _detector is None: + _detector = PromptDetector() + return _detector diff --git a/pr/ui/display.py b/pr/ui/display.py index 122d4c7..c0a6b2e 100644 --- a/pr/ui/display.py +++ b/pr/ui/display.py @@ -1,4 +1,5 @@ import json +import time from typing import Dict, Any from pr.ui.colors import Colors