279 lines
11 KiB
Python
Raw Normal View History

2025-11-04 07:52:36 +01:00
import re
import time
from collections import defaultdict
class PromptDetector:
"""Detects various process prompts and manages interaction state."""
def __init__(self):
self.prompt_patterns = self._load_prompt_patterns()
self.state_machines = self._load_state_machines()
self.session_states = {}
self.timeout_configs = {
'default': 30, # 30 seconds default timeout
'apt': 300, # 5 minutes for apt operations
'ssh': 60, # 1 minute for SSH connections
'vim': 3600 # 1 hour for vim sessions
}
def _load_prompt_patterns(self):
"""Load regex patterns for detecting various prompts."""
return {
'bash_prompt': [
re.compile(r'[\w\-\.]+@[\w\-\.]+:.*[\$#]\s*$'),
re.compile(r'\$\s*$'),
re.compile(r'#\s*$'),
re.compile(r'>\s*$') # Continuation prompt
],
'confirmation': [
re.compile(r'[Yy]/[Nn]', re.IGNORECASE),
re.compile(r'[Yy]es/[Nn]o', re.IGNORECASE),
re.compile(r'continue\?', re.IGNORECASE),
re.compile(r'proceed\?', re.IGNORECASE)
],
'password': [
re.compile(r'password:', re.IGNORECASE),
re.compile(r'passphrase:', re.IGNORECASE),
re.compile(r'enter password', re.IGNORECASE)
],
'sudo_password': [
re.compile(r'\[sudo\].*password', re.IGNORECASE)
],
'apt': [
re.compile(r'Do you want to continue\?', re.IGNORECASE),
re.compile(r'After this operation', re.IGNORECASE),
re.compile(r'Need to get', re.IGNORECASE)
],
'vim': [
re.compile(r'-- INSERT --'),
re.compile(r'-- VISUAL --'),
re.compile(r':'),
re.compile(r'Press ENTER', re.IGNORECASE)
],
'ssh': [
re.compile(r'yes/no', re.IGNORECASE),
re.compile(r'password:', re.IGNORECASE),
re.compile(r'Permission denied', re.IGNORECASE)
],
'git': [
re.compile(r'Username:', re.IGNORECASE),
re.compile(r'Email:', re.IGNORECASE)
],
'error': [
re.compile(r'error:', re.IGNORECASE),
re.compile(r'failed', re.IGNORECASE),
re.compile(r'exception', re.IGNORECASE)
]
}
def _load_state_machines(self):
"""Load state machines for different process types."""
return {
'apt': {
'states': ['initial', 'running', 'confirming', 'installing', 'completed', 'error'],
'transitions': {
'initial': ['running'],
'running': ['confirming', 'installing', 'completed', 'error'],
'confirming': ['installing', 'cancelled'],
'installing': ['completed', 'error'],
'completed': [],
'error': [],
'cancelled': []
}
},
'ssh': {
'states': ['initial', 'connecting', 'authenticating', 'connected', 'error'],
'transitions': {
'initial': ['connecting'],
'connecting': ['authenticating', 'connected', 'error'],
'authenticating': ['connected', 'error'],
'connected': ['error'],
'error': []
}
},
'vim': {
'states': ['initial', 'normal', 'insert', 'visual', 'command', 'exiting'],
'transitions': {
'initial': ['normal', 'insert'],
'normal': ['insert', 'visual', 'command', 'exiting'],
'insert': ['normal'],
'visual': ['normal'],
'command': ['normal', 'exiting'],
'exiting': []
}
}
}
def detect_prompt(self, output, process_type='generic'):
"""Detect what type of prompt is present in the output."""
detections = {}
# Check all pattern categories
for category, patterns in self.prompt_patterns.items():
for pattern in patterns:
if pattern.search(output):
if category not in detections:
detections[category] = []
detections[category].append(pattern.pattern)
# Process type specific detection
if process_type in self.prompt_patterns:
for pattern in self.prompt_patterns[process_type]:
if pattern.search(output):
detections[process_type] = detections.get(process_type, [])
detections[process_type].append(pattern.pattern)
return detections
def get_response_suggestions(self, prompt_detections, process_type='generic'):
"""Get suggested responses based on detected prompts."""
suggestions = []
for category, patterns in prompt_detections.items():
if category == 'confirmation':
suggestions.extend(['y', 'yes', 'n', 'no'])
elif category == 'password':
suggestions.append('<password>')
elif category == 'sudo_password':
suggestions.append('<sudo_password>')
elif category == 'apt':
if any('continue' in p for p in patterns):
suggestions.extend(['y', 'yes'])
elif category == 'vim':
if any(':' in p for p in patterns):
suggestions.extend(['w', 'q', 'wq', 'q!'])
elif any('ENTER' in p for p in patterns):
suggestions.append('\n')
elif category == 'ssh':
if any('yes/no' in p for p in patterns):
suggestions.extend(['yes', 'no'])
elif any('password' in p for p in patterns):
suggestions.append('<password>')
elif category == 'bash_prompt':
suggestions.extend(['help', 'ls', 'pwd', 'exit'])
return list(set(suggestions)) # Remove duplicates
def update_session_state(self, session_name, output, process_type='generic'):
"""Update the state machine for a session based on output."""
if session_name not in self.session_states:
self.session_states[session_name] = {
'current_state': 'initial',
'process_type': process_type,
'last_activity': time.time(),
'transitions': []
}
session_state = self.session_states[session_name]
old_state = session_state['current_state']
# Detect prompts and determine new state
detections = self.detect_prompt(output, process_type)
new_state = self._determine_state_from_detections(detections, process_type, old_state)
if new_state != old_state:
session_state['transitions'].append({
'from': old_state,
'to': new_state,
'timestamp': time.time(),
'trigger': detections
})
session_state['current_state'] = new_state
session_state['last_activity'] = time.time()
return new_state
def _determine_state_from_detections(self, detections, process_type, current_state):
"""Determine new state based on prompt detections."""
if process_type in self.state_machines:
state_machine = self.state_machines[process_type]
# State transition logic based on detections
if 'confirmation' in detections and current_state in ['running', 'initial']:
return 'confirming'
elif 'password' in detections or 'sudo_password' in detections:
return 'authenticating'
elif 'error' in detections:
return 'error'
elif 'bash_prompt' in detections and current_state != 'initial':
return 'connected' if process_type == 'ssh' else 'completed'
elif 'vim' in detections:
if any('-- INSERT --' in p for p in detections.get('vim', [])):
return 'insert'
elif any('-- VISUAL --' in p for p in detections.get('vim', [])):
return 'visual'
elif any(':' in p for p in detections.get('vim', [])):
return 'command'
# Default state progression
if current_state == 'initial':
return 'running'
elif current_state == 'running' and detections:
return 'waiting_input'
elif current_state == 'waiting_input' and not detections:
return 'running'
return current_state
def is_waiting_for_input(self, session_name):
"""Check if a session is currently waiting for input."""
if session_name not in self.session_states:
return False
state = self.session_states[session_name]['current_state']
process_type = self.session_states[session_name]['process_type']
# States that typically indicate waiting for input
waiting_states = {
'generic': ['waiting_input'],
'apt': ['confirming'],
'ssh': ['authenticating'],
'vim': ['command', 'insert', 'visual']
}
return state in waiting_states.get(process_type, [])
def get_session_timeout(self, session_name):
"""Get the timeout for a session based on its process type."""
if session_name not in self.session_states:
return self.timeout_configs['default']
process_type = self.session_states[session_name]['process_type']
return self.timeout_configs.get(process_type, self.timeout_configs['default'])
def check_for_timeouts(self):
"""Check all sessions for timeouts and return timed out sessions."""
timed_out = []
current_time = time.time()
for session_name, state in self.session_states.items():
timeout = self.get_session_timeout(session_name)
if current_time - state['last_activity'] > timeout:
timed_out.append(session_name)
return timed_out
def get_session_info(self, session_name):
"""Get information about a session's state."""
if session_name not in self.session_states:
return None
state = self.session_states[session_name]
return {
'current_state': state['current_state'],
'process_type': state['process_type'],
'last_activity': state['last_activity'],
'transitions': state['transitions'][-5:], # Last 5 transitions
'is_waiting': self.is_waiting_for_input(session_name)
}
# Global detector instance
_detector = None
def get_global_detector():
"""Get the global prompt detector instance."""
global _detector
if _detector is None:
_detector = PromptDetector()
return _detector