import re import time class PromptDetector: """Detects various process prompts and manages interaction state.""" def __init__(self): self.prompt_patterns = self._load_prompt_patterns() self.state_machines = self._load_state_machines() self.session_states = {} self.timeout_configs = { "default": 30, # 30 seconds default timeout "apt": 300, # 5 minutes for apt operations "ssh": 60, # 1 minute for SSH connections "vim": 3600, # 1 hour for vim sessions } def _load_prompt_patterns(self): """Load regex patterns for detecting various prompts.""" return { "bash_prompt": [ re.compile(r"[\w\-\.]+@[\w\-\.]+:.*[\$#]\s*$"), re.compile(r"\$\s*$"), re.compile(r"#\s*$"), re.compile(r">\s*$"), # Continuation prompt ], "confirmation": [ re.compile(r"[Yy]/[Nn]", re.IGNORECASE), re.compile(r"[Yy]es/[Nn]o", re.IGNORECASE), re.compile(r"continue\?", re.IGNORECASE), re.compile(r"proceed\?", re.IGNORECASE), ], "password": [ re.compile(r"password:", re.IGNORECASE), re.compile(r"passphrase:", re.IGNORECASE), re.compile(r"enter password", re.IGNORECASE), ], "sudo_password": [re.compile(r"\[sudo\].*password", re.IGNORECASE)], "apt": [ re.compile(r"Do you want to continue\?", re.IGNORECASE), re.compile(r"After this operation", re.IGNORECASE), re.compile(r"Need to get", re.IGNORECASE), ], "vim": [ re.compile(r"-- INSERT --"), re.compile(r"-- VISUAL --"), re.compile(r":"), re.compile(r"Press ENTER", re.IGNORECASE), ], "ssh": [ re.compile(r"yes/no", re.IGNORECASE), re.compile(r"password:", re.IGNORECASE), re.compile(r"Permission denied", re.IGNORECASE), ], "git": [ re.compile(r"Username:", re.IGNORECASE), re.compile(r"Email:", re.IGNORECASE), ], "error": [ re.compile(r"error:", re.IGNORECASE), re.compile(r"failed", re.IGNORECASE), re.compile(r"exception", re.IGNORECASE), ], } def _load_state_machines(self): """Load state machines for different process types.""" return { "apt": { "states": [ "initial", "running", "confirming", "installing", "completed", "error", ], "transitions": { "initial": ["running"], "running": ["confirming", "installing", "completed", "error"], "confirming": ["installing", "cancelled"], "installing": ["completed", "error"], "completed": [], "error": [], "cancelled": [], }, }, "ssh": { "states": [ "initial", "connecting", "authenticating", "connected", "error", ], "transitions": { "initial": ["connecting"], "connecting": ["authenticating", "connected", "error"], "authenticating": ["connected", "error"], "connected": ["error"], "error": [], }, }, "vim": { "states": [ "initial", "normal", "insert", "visual", "command", "exiting", ], "transitions": { "initial": ["normal", "insert"], "normal": ["insert", "visual", "command", "exiting"], "insert": ["normal"], "visual": ["normal"], "command": ["normal", "exiting"], "exiting": [], }, }, } def detect_prompt(self, output, process_type="generic"): """Detect what type of prompt is present in the output.""" detections = {} # Check all pattern categories for category, patterns in self.prompt_patterns.items(): for pattern in patterns: if pattern.search(output): if category not in detections: detections[category] = [] detections[category].append(pattern.pattern) # Process type specific detection if process_type in self.prompt_patterns: for pattern in self.prompt_patterns[process_type]: if pattern.search(output): detections[process_type] = detections.get(process_type, []) detections[process_type].append(pattern.pattern) return detections def get_response_suggestions(self, prompt_detections, process_type="generic"): """Get suggested responses based on detected prompts.""" suggestions = [] for category, patterns in prompt_detections.items(): if category == "confirmation": suggestions.extend(["y", "yes", "n", "no"]) elif category == "password": suggestions.append("") elif category == "sudo_password": suggestions.append("") elif category == "apt": if any("continue" in p for p in patterns): suggestions.extend(["y", "yes"]) elif category == "vim": if any(":" in p for p in patterns): suggestions.extend(["w", "q", "wq", "q!"]) elif any("ENTER" in p for p in patterns): suggestions.append("\n") elif category == "ssh": if any("yes/no" in p for p in patterns): suggestions.extend(["yes", "no"]) elif any("password" in p for p in patterns): suggestions.append("") elif category == "bash_prompt": suggestions.extend(["help", "ls", "pwd", "exit"]) return list(set(suggestions)) # Remove duplicates def update_session_state(self, session_name, output, process_type="generic"): """Update the state machine for a session based on output.""" if session_name not in self.session_states: self.session_states[session_name] = { "current_state": "initial", "process_type": process_type, "last_activity": time.time(), "transitions": [], } session_state = self.session_states[session_name] old_state = session_state["current_state"] # Detect prompts and determine new state detections = self.detect_prompt(output, process_type) new_state = self._determine_state_from_detections(detections, process_type, old_state) if new_state != old_state: session_state["transitions"].append( { "from": old_state, "to": new_state, "timestamp": time.time(), "trigger": detections, } ) session_state["current_state"] = new_state session_state["last_activity"] = time.time() return new_state def _determine_state_from_detections(self, detections, process_type, current_state): """Determine new state based on prompt detections.""" if process_type in self.state_machines: self.state_machines[process_type] # State transition logic based on detections if "confirmation" in detections and current_state in ["running", "initial"]: return "confirming" elif "password" in detections or "sudo_password" in detections: return "authenticating" elif "error" in detections: return "error" elif "bash_prompt" in detections and current_state != "initial": return "connected" if process_type == "ssh" else "completed" elif "vim" in detections: if any("-- INSERT --" in p for p in detections.get("vim", [])): return "insert" elif any("-- VISUAL --" in p for p in detections.get("vim", [])): return "visual" elif any(":" in p for p in detections.get("vim", [])): return "command" # Default state progression if current_state == "initial": return "running" elif current_state == "running" and detections: return "waiting_input" elif current_state == "waiting_input" and not detections: return "running" return current_state def is_waiting_for_input(self, session_name): """Check if a session is currently waiting for input.""" if session_name not in self.session_states: return False state = self.session_states[session_name]["current_state"] process_type = self.session_states[session_name]["process_type"] # States that typically indicate waiting for input waiting_states = { "generic": ["waiting_input"], "apt": ["confirming"], "ssh": ["authenticating"], "vim": ["command", "insert", "visual"], } return state in waiting_states.get(process_type, []) def get_session_timeout(self, session_name): """Get the timeout for a session based on its process type.""" if session_name not in self.session_states: return self.timeout_configs["default"] process_type = self.session_states[session_name]["process_type"] return self.timeout_configs.get(process_type, self.timeout_configs["default"]) def check_for_timeouts(self): """Check all sessions for timeouts and return timed out sessions.""" timed_out = [] current_time = time.time() for session_name, state in self.session_states.items(): timeout = self.get_session_timeout(session_name) if current_time - state["last_activity"] > timeout: timed_out.append(session_name) return timed_out def get_session_info(self, session_name): """Get information about a session's state.""" if session_name not in self.session_states: return None state = self.session_states[session_name] return { "current_state": state["current_state"], "process_type": state["process_type"], "last_activity": state["last_activity"], "transitions": state["transitions"][-5:], # Last 5 transitions "is_waiting": self.is_waiting_for_input(session_name), } # Global detector instance _detector = None def get_global_detector(): """Get the global prompt detector instance.""" global _detector if _detector is None: _detector = PromptDetector() return _detector