import re import time from collections import defaultdict class PromptDetector: """Detects various process prompts and manages interaction state.""" def __init__(self): self.prompt_patterns = self._load_prompt_patterns() self.state_machines = self._load_state_machines() self.session_states = {} self.timeout_configs = { 'default': 30, # 30 seconds default timeout 'apt': 300, # 5 minutes for apt operations 'ssh': 60, # 1 minute for SSH connections 'vim': 3600 # 1 hour for vim sessions } def _load_prompt_patterns(self): """Load regex patterns for detecting various prompts.""" return { 'bash_prompt': [ re.compile(r'[\w\-\.]+@[\w\-\.]+:.*[\$#]\s*$'), re.compile(r'\$\s*$'), re.compile(r'#\s*$'), re.compile(r'>\s*$') # Continuation prompt ], 'confirmation': [ re.compile(r'[Yy]/[Nn]', re.IGNORECASE), re.compile(r'[Yy]es/[Nn]o', re.IGNORECASE), re.compile(r'continue\?', re.IGNORECASE), re.compile(r'proceed\?', re.IGNORECASE) ], 'password': [ re.compile(r'password:', re.IGNORECASE), re.compile(r'passphrase:', re.IGNORECASE), re.compile(r'enter password', re.IGNORECASE) ], 'sudo_password': [ re.compile(r'\[sudo\].*password', re.IGNORECASE) ], 'apt': [ re.compile(r'Do you want to continue\?', re.IGNORECASE), re.compile(r'After this operation', re.IGNORECASE), re.compile(r'Need to get', re.IGNORECASE) ], 'vim': [ re.compile(r'-- INSERT --'), re.compile(r'-- VISUAL --'), re.compile(r':'), re.compile(r'Press ENTER', re.IGNORECASE) ], 'ssh': [ re.compile(r'yes/no', re.IGNORECASE), re.compile(r'password:', re.IGNORECASE), re.compile(r'Permission denied', re.IGNORECASE) ], 'git': [ re.compile(r'Username:', re.IGNORECASE), re.compile(r'Email:', re.IGNORECASE) ], 'error': [ re.compile(r'error:', re.IGNORECASE), re.compile(r'failed', re.IGNORECASE), re.compile(r'exception', re.IGNORECASE) ] } def _load_state_machines(self): """Load state machines for different process types.""" return { 'apt': { 'states': ['initial', 'running', 'confirming', 'installing', 'completed', 'error'], 'transitions': { 'initial': ['running'], 'running': ['confirming', 'installing', 'completed', 'error'], 'confirming': ['installing', 'cancelled'], 'installing': ['completed', 'error'], 'completed': [], 'error': [], 'cancelled': [] } }, 'ssh': { 'states': ['initial', 'connecting', 'authenticating', 'connected', 'error'], 'transitions': { 'initial': ['connecting'], 'connecting': ['authenticating', 'connected', 'error'], 'authenticating': ['connected', 'error'], 'connected': ['error'], 'error': [] } }, 'vim': { 'states': ['initial', 'normal', 'insert', 'visual', 'command', 'exiting'], 'transitions': { 'initial': ['normal', 'insert'], 'normal': ['insert', 'visual', 'command', 'exiting'], 'insert': ['normal'], 'visual': ['normal'], 'command': ['normal', 'exiting'], 'exiting': [] } } } def detect_prompt(self, output, process_type='generic'): """Detect what type of prompt is present in the output.""" detections = {} # Check all pattern categories for category, patterns in self.prompt_patterns.items(): for pattern in patterns: if pattern.search(output): if category not in detections: detections[category] = [] detections[category].append(pattern.pattern) # Process type specific detection if process_type in self.prompt_patterns: for pattern in self.prompt_patterns[process_type]: if pattern.search(output): detections[process_type] = detections.get(process_type, []) detections[process_type].append(pattern.pattern) return detections def get_response_suggestions(self, prompt_detections, process_type='generic'): """Get suggested responses based on detected prompts.""" suggestions = [] for category, patterns in prompt_detections.items(): if category == 'confirmation': suggestions.extend(['y', 'yes', 'n', 'no']) elif category == 'password': suggestions.append('') elif category == 'sudo_password': suggestions.append('') elif category == 'apt': if any('continue' in p for p in patterns): suggestions.extend(['y', 'yes']) elif category == 'vim': if any(':' in p for p in patterns): suggestions.extend(['w', 'q', 'wq', 'q!']) elif any('ENTER' in p for p in patterns): suggestions.append('\n') elif category == 'ssh': if any('yes/no' in p for p in patterns): suggestions.extend(['yes', 'no']) elif any('password' in p for p in patterns): suggestions.append('') elif category == 'bash_prompt': suggestions.extend(['help', 'ls', 'pwd', 'exit']) return list(set(suggestions)) # Remove duplicates def update_session_state(self, session_name, output, process_type='generic'): """Update the state machine for a session based on output.""" if session_name not in self.session_states: self.session_states[session_name] = { 'current_state': 'initial', 'process_type': process_type, 'last_activity': time.time(), 'transitions': [] } session_state = self.session_states[session_name] old_state = session_state['current_state'] # Detect prompts and determine new state detections = self.detect_prompt(output, process_type) new_state = self._determine_state_from_detections(detections, process_type, old_state) if new_state != old_state: session_state['transitions'].append({ 'from': old_state, 'to': new_state, 'timestamp': time.time(), 'trigger': detections }) session_state['current_state'] = new_state session_state['last_activity'] = time.time() return new_state def _determine_state_from_detections(self, detections, process_type, current_state): """Determine new state based on prompt detections.""" if process_type in self.state_machines: state_machine = self.state_machines[process_type] # State transition logic based on detections if 'confirmation' in detections and current_state in ['running', 'initial']: return 'confirming' elif 'password' in detections or 'sudo_password' in detections: return 'authenticating' elif 'error' in detections: return 'error' elif 'bash_prompt' in detections and current_state != 'initial': return 'connected' if process_type == 'ssh' else 'completed' elif 'vim' in detections: if any('-- INSERT --' in p for p in detections.get('vim', [])): return 'insert' elif any('-- VISUAL --' in p for p in detections.get('vim', [])): return 'visual' elif any(':' in p for p in detections.get('vim', [])): return 'command' # Default state progression if current_state == 'initial': return 'running' elif current_state == 'running' and detections: return 'waiting_input' elif current_state == 'waiting_input' and not detections: return 'running' return current_state def is_waiting_for_input(self, session_name): """Check if a session is currently waiting for input.""" if session_name not in self.session_states: return False state = self.session_states[session_name]['current_state'] process_type = self.session_states[session_name]['process_type'] # States that typically indicate waiting for input waiting_states = { 'generic': ['waiting_input'], 'apt': ['confirming'], 'ssh': ['authenticating'], 'vim': ['command', 'insert', 'visual'] } return state in waiting_states.get(process_type, []) def get_session_timeout(self, session_name): """Get the timeout for a session based on its process type.""" if session_name not in self.session_states: return self.timeout_configs['default'] process_type = self.session_states[session_name]['process_type'] return self.timeout_configs.get(process_type, self.timeout_configs['default']) def check_for_timeouts(self): """Check all sessions for timeouts and return timed out sessions.""" timed_out = [] current_time = time.time() for session_name, state in self.session_states.items(): timeout = self.get_session_timeout(session_name) if current_time - state['last_activity'] > timeout: timed_out.append(session_name) return timed_out def get_session_info(self, session_name): """Get information about a session's state.""" if session_name not in self.session_states: return None state = self.session_states[session_name] return { 'current_state': state['current_state'], 'process_type': state['process_type'], 'last_activity': state['last_activity'], 'transitions': state['transitions'][-5:], # Last 5 transitions 'is_waiting': self.is_waiting_for_input(session_name) } # Global detector instance _detector = None def get_global_detector(): """Get the global prompt detector instance.""" global _detector if _detector is None: _detector = PromptDetector() return _detector