import re
import time
class PromptDetector:
"""Detects various process prompts and manages interaction state."""
def __init__(self):
self.prompt_patterns = self._load_prompt_patterns()
self.state_machines = self._load_state_machines()
self.session_states = {}
self.timeout_configs = {
"default": 30, # 30 seconds default timeout
"apt": 300, # 5 minutes for apt operations
"ssh": 60, # 1 minute for SSH connections
"vim": 3600, # 1 hour for vim sessions
}
def _load_prompt_patterns(self):
"""Load regex patterns for detecting various prompts."""
return {
"bash_prompt": [
re.compile(r"[\w\-\.]+@[\w\-\.]+:.*[\$#]\s*$"),
re.compile(r"\$\s*$"),
re.compile(r"#\s*$"),
re.compile(r">\s*$"), # Continuation prompt
],
"confirmation": [
re.compile(r"[Yy]/[Nn]", re.IGNORECASE),
re.compile(r"[Yy]es/[Nn]o", re.IGNORECASE),
re.compile(r"continue\?", re.IGNORECASE),
re.compile(r"proceed\?", re.IGNORECASE),
],
"password": [
re.compile(r"password:", re.IGNORECASE),
re.compile(r"passphrase:", re.IGNORECASE),
re.compile(r"enter password", re.IGNORECASE),
],
"sudo_password": [re.compile(r"\[sudo\].*password", re.IGNORECASE)],
"apt": [
re.compile(r"Do you want to continue\?", re.IGNORECASE),
re.compile(r"After this operation", re.IGNORECASE),
re.compile(r"Need to get", re.IGNORECASE),
],
"vim": [
re.compile(r"-- INSERT --"),
re.compile(r"-- VISUAL --"),
re.compile(r":"),
re.compile(r"Press ENTER", re.IGNORECASE),
],
"ssh": [
re.compile(r"yes/no", re.IGNORECASE),
re.compile(r"password:", re.IGNORECASE),
re.compile(r"Permission denied", re.IGNORECASE),
],
"git": [
re.compile(r"Username:", re.IGNORECASE),
re.compile(r"Email:", re.IGNORECASE),
],
"error": [
re.compile(r"error:", re.IGNORECASE),
re.compile(r"failed", re.IGNORECASE),
re.compile(r"exception", re.IGNORECASE),
],
}
def _load_state_machines(self):
"""Load state machines for different process types."""
return {
"apt": {
"states": [
"initial",
"running",
"confirming",
"installing",
"completed",
"error",
],
"transitions": {
"initial": ["running"],
"running": ["confirming", "installing", "completed", "error"],
"confirming": ["installing", "cancelled"],
"installing": ["completed", "error"],
"completed": [],
"error": [],
"cancelled": [],
},
},
"ssh": {
"states": [
"initial",
"connecting",
"authenticating",
"connected",
"error",
],
"transitions": {
"initial": ["connecting"],
"connecting": ["authenticating", "connected", "error"],
"authenticating": ["connected", "error"],
"connected": ["error"],
"error": [],
},
},
"vim": {
"states": [
"initial",
"normal",
"insert",
"visual",
"command",
"exiting",
],
"transitions": {
"initial": ["normal", "insert"],
"normal": ["insert", "visual", "command", "exiting"],
"insert": ["normal"],
"visual": ["normal"],
"command": ["normal", "exiting"],
"exiting": [],
},
},
}
def detect_prompt(self, output, process_type="generic"):
"""Detect what type of prompt is present in the output."""
detections = {}
# Check all pattern categories
for category, patterns in self.prompt_patterns.items():
for pattern in patterns:
if pattern.search(output):
if category not in detections:
detections[category] = []
detections[category].append(pattern.pattern)
# Process type specific detection
if process_type in self.prompt_patterns:
for pattern in self.prompt_patterns[process_type]:
if pattern.search(output):
detections[process_type] = detections.get(process_type, [])
detections[process_type].append(pattern.pattern)
return detections
def get_response_suggestions(self, prompt_detections, process_type="generic"):
"""Get suggested responses based on detected prompts."""
suggestions = []
for category, patterns in prompt_detections.items():
if category == "confirmation":
suggestions.extend(["y", "yes", "n", "no"])
elif category == "password":
suggestions.append("<password>")
elif category == "sudo_password":
suggestions.append("<sudo_password>")
elif category == "apt":
if any("continue" in p for p in patterns):
suggestions.extend(["y", "yes"])
elif category == "vim":
if any(":" in p for p in patterns):
suggestions.extend(["w", "q", "wq", "q!"])
elif any("ENTER" in p for p in patterns):
suggestions.append("\n")
elif category == "ssh":
if any("yes/no" in p for p in patterns):
suggestions.extend(["yes", "no"])
elif any("password" in p for p in patterns):
suggestions.append("<password>")
elif category == "bash_prompt":
suggestions.extend(["help", "ls", "pwd", "exit"])
return list(set(suggestions)) # Remove duplicates
def update_session_state(self, session_name, output, process_type="generic"):
"""Update the state machine for a session based on output."""
if session_name not in self.session_states:
self.session_states[session_name] = {
"current_state": "initial",
"process_type": process_type,
"last_activity": time.time(),
"transitions": [],
}
session_state = self.session_states[session_name]
old_state = session_state["current_state"]
# Detect prompts and determine new state
detections = self.detect_prompt(output, process_type)
new_state = self._determine_state_from_detections(
detections, process_type, old_state
)
if new_state != old_state:
session_state["transitions"].append(
{
"from": old_state,
"to": new_state,
"timestamp": time.time(),
"trigger": detections,
}
)
session_state["current_state"] = new_state
session_state["last_activity"] = time.time()
return new_state
def _determine_state_from_detections(self, detections, process_type, current_state):
"""Determine new state based on prompt detections."""
if process_type in self.state_machines:
self.state_machines[process_type]
# State transition logic based on detections
if "confirmation" in detections and current_state in ["running", "initial"]:
return "confirming"
elif "password" in detections or "sudo_password" in detections:
return "authenticating"
elif "error" in detections:
return "error"
elif "bash_prompt" in detections and current_state != "initial":
return "connected" if process_type == "ssh" else "completed"
elif "vim" in detections:
if any("-- INSERT --" in p for p in detections.get("vim", [])):
return "insert"
elif any("-- VISUAL --" in p for p in detections.get("vim", [])):
return "visual"
elif any(":" in p for p in detections.get("vim", [])):
return "command"
# Default state progression
if current_state == "initial":
return "running"
elif current_state == "running" and detections:
return "waiting_input"
elif current_state == "waiting_input" and not detections:
return "running"
return current_state
def is_waiting_for_input(self, session_name):
"""Check if a session is currently waiting for input."""
if session_name not in self.session_states:
return False
state = self.session_states[session_name]["current_state"]
process_type = self.session_states[session_name]["process_type"]
# States that typically indicate waiting for input
waiting_states = {
"generic": ["waiting_input"],
"apt": ["confirming"],
"ssh": ["authenticating"],
"vim": ["command", "insert", "visual"],
}
return state in waiting_states.get(process_type, [])
def get_session_timeout(self, session_name):
"""Get the timeout for a session based on its process type."""
if session_name not in self.session_states:
return self.timeout_configs["default"]
process_type = self.session_states[session_name]["process_type"]
return self.timeout_configs.get(process_type, self.timeout_configs["default"])
def check_for_timeouts(self):
"""Check all sessions for timeouts and return timed out sessions."""
timed_out = []
current_time = time.time()
for session_name, state in self.session_states.items():
timeout = self.get_session_timeout(session_name)
if current_time - state["last_activity"] > timeout:
timed_out.append(session_name)
return timed_out
def get_session_info(self, session_name):
"""Get information about a session's state."""
if session_name not in self.session_states:
return None
state = self.session_states[session_name]
return {
"current_state": state["current_state"],
"process_type": state["process_type"],
"last_activity": state["last_activity"],
"transitions": state["transitions"][-5:], # Last 5 transitions
"is_waiting": self.is_waiting_for_input(session_name),
}
# Global detector instance
_detector = None
def get_global_detector():
"""Get the global prompt detector instance."""
global _detector
if _detector is None:
_detector = PromptDetector()
return _detector