#!/usr/bin/env python3 """ Production-ready WebSocket Browser Server with Playwright-compatible API Manages multiple WebKit2GTK browser instances with improved concurrency. This server is designed to work with various WebKit2 versions and automatically adapts to the available API. It supports both the original command API and the new Playwright-compatible API for maximum flexibility. WebKit2 Version Compatibility: - WebKit2 4.1: Full support with newer evaluate_javascript API - WebKit2 4.0: Full support with run_javascript API - WebKit2 3.0: Basic support with fallback mechanisms The server automatically detects the available WebKit2 version and uses appropriate methods for JavaScript execution and result extraction. """ import gi gi.require_version('Gtk', '3.0') # Attempt to load WebKit2 with version fallback webkit_loaded = False webkit_version = None for version in ['4.1', '4.0', '3.0']: try: gi.require_version('WebKit2', version) webkit_loaded = True webkit_version = version break except ValueError: continue if not webkit_loaded: print("Error: WebKit2 is not installed or available.") print("Please install it using:") print(" Ubuntu/Debian: sudo apt-get install gir1.2-webkit2-4.0") print(" Fedora: sudo dnf install webkit2gtk3") print(" Arch: sudo pacman -S webkit2gtk") exit(1) from gi.repository import Gtk, WebKit2, GLib, Gdk import asyncio import websockets import json import threading import uuid import time import base64 import logging from typing import Dict, Optional, Any, List from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import traceback import weakref import re # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) print(f"Successfully loaded WebKit2 version {webkit_version}") @dataclass class PendingCallback: """Container for pending JavaScript callbacks""" request_id: str timestamp: float timeout: float = 30.0 @property def is_expired(self) -> bool: return time.time() - self.timestamp > self.timeout class RemoteBrowser: """ A WebKit2GTK browser instance that can be controlled via WebSocket. Each connection gets its own browser window with improved concurrency. """ def __init__(self, connection_id: str, websocket, server, loop: asyncio.AbstractEventLoop): self.connection_id = connection_id self.websocket = websocket self.server = server self.loop = loop # The asyncio event loop from the main thread self.window = None self.webview = None self.pending_callbacks: Dict[str, PendingCallback] = {} self.is_closing = False self.is_ready = False self._lock = asyncio.Lock() self._cleanup_task = None self._gtk_lock = threading.Lock() self._browser_created = threading.Event() self._creation_error = None async def start(self): """Start the browser and cleanup task""" try: # Start cleanup task for expired callbacks self._cleanup_task = asyncio.create_task(self._cleanup_expired_callbacks()) except Exception as e: logger.error(f"Error starting browser tasks: {e}") raise async def _cleanup_expired_callbacks(self): """Periodically clean up expired callbacks""" while not self.is_closing: try: await asyncio.sleep(5) # Check every 5 seconds async with self._lock: expired_ids = [ callback_id for callback_id, callback in self.pending_callbacks.items() if callback.is_expired ] for callback_id in expired_ids: try: async with self._lock: callback = self.pending_callbacks.pop(callback_id, None) if callback: logger.warning(f"Command timed out for request_id: {callback.request_id}") await self.send_response( callback.request_id, None, f"Command for request_id {callback.request_id} timed out" ) except Exception as e: logger.error(f"Error handling expired callback {callback_id}: {e}") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in cleanup task: {e}") def create_browser(self): """Create the browser window in the GTK thread.""" try: with self._gtk_lock: if self.is_closing: return self.window = Gtk.Window(title=f"Remote Browser - {self.connection_id[:8]}") self.window.set_default_size(1280, 720) self.window.connect("destroy", self.on_window_destroy) # Set window properties for stability self.window.set_resizable(True) self.window.set_deletable(True) main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) self.window.add(main_box) nav_bar = self.create_navigation_bar() main_box.pack_start(nav_bar, False, False, 0) self.webview = WebKit2.WebView() if not self.setup_webview(): raise Exception("Failed to setup WebView") scrolled_window = Gtk.ScrolledWindow() scrolled_window.add(self.webview) main_box.pack_start(scrolled_window, True, True, 0) self.status_bar = Gtk.Statusbar() self.status_context = self.status_bar.get_context_id("connection") self.status_bar.push(self.status_context, f"Connected: {self.connection_id[:8]}") main_box.pack_start(self.status_bar, False, False, 0) self.window.show_all() self.window.present() self.is_ready = True self._browser_created.set() GLib.idle_add(self._notify_ready) except Exception as e: logger.error(f"Error creating browser window: {e}") self._creation_error = str(e) self._browser_created.set() GLib.idle_add(self._notify_error, str(e)) def _notify_ready(self): """Notify that browser is ready (called from GTK thread).""" if not self.is_closing: try: coro = self.send_event("browser_ready", {"connection_id": self.connection_id}) asyncio.run_coroutine_threadsafe(coro, self.loop) except: pass def _execute_wrapped_js(self, script: str, request_id: str): """Execute JavaScript wrapped to always return a string for maximum compatibility.""" def js_finished(webview, task, user_data): try: # Try to get the result string directly result_str = None try: # For very old WebKit2 versions, try the simple approach result = webview.run_javascript_finish(task) result_str = str(result) # Check if the result looks like our wrapped response if result_str and ('"success":true' in result_str or '"success":false' in result_str): # Extract the JSON from the string representation import re json_match = re.search(r'\{.*\}', result_str) if json_match: result_str = json_match.group(0) except Exception as e: logger.debug(f"Failed to get result string: {e}") if result_str: try: parsed = json.loads(result_str) if parsed.get("success"): value = parsed.get("value") else: value = None error_msg = parsed.get("error", "Unknown error") coro = self.send_response(request_id, value, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) return except json.JSONDecodeError: value = result_str else: value = None coro = self.send_response(request_id, value) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"Error in wrapped JS execution: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) try: self.webview.run_javascript(script, None, js_finished, None) except Exception as e: error_msg = f"Failed to execute wrapped JavaScript: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) def _execute_wait_check(self, script: str, check_id: str, callback): """Execute a simple check for wait operations.""" def js_finished(webview, task, user_data): try: result_str = None try: result = webview.run_javascript_finish(task) # Extract the JSON string from the result if hasattr(result, 'get_js_value'): js_value = result.get_js_value() if hasattr(js_value, 'to_string'): result_str = js_value.to_string() elif hasattr(result, 'get_value'): result_str = result.get_value() else: result_str = str(result) # Clean up string representation import re json_match = re.search(r'\{.*\}', result_str) if json_match: result_str = json_match.group(0) except Exception as e: logger.debug(f"Failed to get wait check result: {e}") callback(result_str) except Exception as e: logger.error(f"Error in wait check: {e}") callback(None) try: self.webview.run_javascript(script, None, js_finished, None) except Exception as e: logger.error(f"Failed to execute wait check: {e}") callback(None) def _notify_error(self, error_msg): """Notify about browser creation error (called from GTK thread).""" if not self.is_closing: try: coro = self.send_event("browser_error", {"error": error_msg}) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: logger.error(f"Error notifying browser error: {e}") def create_navigation_bar(self): """Create a simple navigation bar.""" try: nav_bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) nav_bar.set_margin_top(5) nav_bar.set_margin_bottom(5) # Use newer margin methods instead of deprecated ones nav_bar.set_margin_start(5) nav_bar.set_margin_end(5) self.url_label = Gtk.Label() self.url_label.set_ellipsize(True) self.url_label.set_max_width_chars(50) nav_bar.pack_start(self.url_label, True, True, 0) return nav_bar except Exception as e: logger.error(f"Error creating navigation bar: {e}") # Return minimal nav bar on error return Gtk.Box() def setup_webview(self) -> bool: """Configure WebKit view settings.""" try: if not self.webview: return False settings = self.webview.get_settings() settings.set_enable_developer_extras(True) settings.set_enable_javascript(True) settings.set_allow_file_access_from_file_urls(True) settings.set_enable_webgl(True) settings.set_enable_media_stream(True) # Only set non-deprecated security settings try: # These might be available in newer versions if hasattr(settings, 'set_enable_java'): settings.set_enable_java(False) if hasattr(settings, 'set_enable_plugins'): settings.set_enable_plugins(False) except: # Ignore if not available pass # Connect signals with weak references to avoid circular references self.webview.connect("load-changed", self.on_load_changed) self.webview.connect("notify::uri", self.on_uri_changed) self.webview.connect("load-failed", self.on_load_failed) self.webview.connect("notify::title", self.on_title_changed) return True except Exception as e: logger.error(f"Error setting up webview: {e}") return False def on_window_destroy(self, widget): """Handle window close event.""" if not self.is_closing: self.is_closing = True try: asyncio.run_coroutine_threadsafe(self.close(), self.loop) except Exception as e: logger.error(f"Error scheduling close: {e}") def on_load_changed(self, webview, load_event): """Handle page load events (called from GTK thread).""" if self.is_closing: return try: event_data = {} # Safely get URI and title try: uri = webview.get_uri() if uri: event_data["url"] = uri except: pass try: title = webview.get_title() if title: event_data["title"] = title except: pass if load_event == WebKit2.LoadEvent.FINISHED: coro = self.send_event("load_finished", event_data) asyncio.run_coroutine_threadsafe(coro, self.loop) elif load_event == WebKit2.LoadEvent.STARTED: coro = self.send_event("load_started", event_data) asyncio.run_coroutine_threadsafe(coro, self.loop) elif load_event == WebKit2.LoadEvent.COMMITTED: coro = self.send_event("load_committed", event_data) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: logger.error(f"Error in load changed handler: {e}") def on_uri_changed(self, webview, param): """Update URL label when URI changes.""" if self.is_closing: return try: uri = webview.get_uri() if uri and self.url_label: self.url_label.set_text(uri) except Exception as e: logger.error(f"Error updating URI: {e}") def on_title_changed(self, webview, param): """Handle title changes (called from GTK thread).""" if self.is_closing: return try: title = webview.get_title() uri = None try: uri = webview.get_uri() except: pass if title: event_data = {"title": title} if uri: event_data["url"] = uri coro = self.send_event("title_changed", event_data) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: logger.error(f"Error in title changed handler: {e}") def on_load_failed(self, webview, load_event, failing_uri, error): """Handle load failures (called from GTK thread).""" if self.is_closing: return try: error_msg = error.message if error else "Unknown error" coro = self.send_event("load_failed", {"url": failing_uri, "error": error_msg}) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: logger.error(f"Error in load failed handler: {e}") async def send_event(self, event_type: str, data: dict): """Send an event to the WebSocket client.""" if self.is_closing or not self.websocket: return try: async with self._lock: if self.websocket.state == websockets.protocol.State.OPEN: await self.websocket.send(json.dumps({ "type": "event", "event": event_type, "data": data, "timestamp": time.time() })) except websockets.exceptions.ConnectionClosed: self.is_closing = True except Exception as e: logger.error(f"Error sending event {event_type}: {e}") async def send_response(self, request_id: str, result: Any, error: Optional[str] = None): """Send a response to a command.""" if self.is_closing or not self.websocket: return try: async with self._lock: if self.websocket.state == websockets.protocol.State.OPEN: await self.websocket.send(json.dumps({ "type": "response", "request_id": request_id, "result": result, "error": error, "timestamp": time.time() })) except websockets.exceptions.ConnectionClosed: self.is_closing = True except Exception as e: logger.error(f"Error sending response for {request_id}: {e}") def execute_javascript(self, script: str, request_id: str): """Execute JavaScript and send result back via WebSocket.""" if not self.webview or self.is_closing: coro = self.send_response(request_id, None, "Browser not ready") asyncio.run_coroutine_threadsafe(coro, self.loop) return # Store callback info with lock try: with self._gtk_lock: self.pending_callbacks[request_id] = PendingCallback(request_id, time.time()) except Exception as e: logger.error(f"Error storing callback: {e}") coro = self.send_response(request_id, None, str(e)) asyncio.run_coroutine_threadsafe(coro, self.loop) return def js_finished(webview, task, user_data): try: with self._gtk_lock: callback = self.pending_callbacks.pop(request_id, None) if not callback: return # Timed out or already handled except Exception as e: logger.error(f"Error retrieving callback: {e}") return try: # Different WebKit2 versions have different APIs result = None value = None # Try newer API first if hasattr(webview, 'evaluate_javascript_finish'): try: result = webview.evaluate_javascript_finish(task) except Exception as e: logger.debug(f"evaluate_javascript_finish failed: {e}") result = None # Fallback to older API if result is None and hasattr(webview, 'run_javascript_finish'): try: result = webview.run_javascript_finish(task) except Exception as e: logger.debug(f"run_javascript_finish failed: {e}") result = None if result is None: # If both methods failed, try to return a simple result # This might happen with very old WebKit2 versions logger.info("Falling back to JSON string execution for compatibility") # Wrap the script to always return a JSON string wrapped_script = f""" (function() {{ try {{ var result = ({script}); return JSON.stringify({{success: true, value: result}}); }} catch (e) {{ return JSON.stringify({{success: false, error: e.toString()}}); }} }})() """ self._execute_wrapped_js(wrapped_script, request_id) return # Try to extract the value from the result # Different WebKit2 versions use different methods if hasattr(result, 'get_js_value'): js_result = result.get_js_value() if hasattr(js_result, 'is_string') and js_result.is_string(): value = js_result.to_string() elif hasattr(js_result, 'is_number') and js_result.is_number(): value = js_result.to_double() elif hasattr(js_result, 'is_boolean') and js_result.is_boolean(): value = js_result.to_boolean() elif hasattr(js_result, 'is_object') and (js_result.is_object() or js_result.is_array()): # Handle JSON serialization json_script = f"JSON.stringify(({script}))" self._execute_simple_js(json_script, request_id) return else: value = None elif hasattr(result, 'get_value'): # Older WebKit2 API try: value = result.get_value() except: # If get_value fails, try JSON serialization json_script = f"JSON.stringify(({script}))" self._execute_simple_js(json_script, request_id) return else: # Last resort: try JSON serialization json_script = f"JSON.stringify(({script}))" self._execute_simple_js(json_script, request_id) return coro = self.send_response(request_id, value) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"JavaScript execution error: {str(e)}" logger.error(f"{error_msg} - Script: {script[:100]}...") # Try simple JSON serialization as fallback try: json_script = f"JSON.stringify(({script}))" self._execute_simple_js(json_script, request_id) except: coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) try: # Use newer evaluate_javascript method if available if hasattr(self.webview, 'evaluate_javascript'): self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None) else: # Fallback to deprecated method self.webview.run_javascript(script, None, js_finished, None) except Exception as e: with self._gtk_lock: self.pending_callbacks.pop(request_id, None) error_msg = f"Failed to execute JavaScript: {str(e)}" logger.error(f"{error_msg} - Script: {script[:100]}...") coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) def _execute_simple_js(self, script: str, request_id: str): """Execute simple JavaScript that returns a string result.""" def js_finished(webview, task, user_data): try: # Different WebKit2 versions have different APIs result = None json_str = None # Try newer API first if hasattr(webview, 'evaluate_javascript_finish'): try: result = webview.evaluate_javascript_finish(task) except Exception as e: logger.debug(f"evaluate_javascript_finish failed: {e}") result = None # Fallback to older API if result is None and hasattr(webview, 'run_javascript_finish'): try: result = webview.run_javascript_finish(task) except Exception as e: logger.debug(f"run_javascript_finish failed: {e}") result = None if result is None: error_msg = "Failed to get JavaScript result" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) return # Try to extract the string value if hasattr(result, 'get_js_value'): js_result = result.get_js_value() if hasattr(js_result, 'to_string'): json_str = js_result.to_string() else: json_str = str(js_result) elif hasattr(result, 'get_value'): # Older WebKit2 API try: json_str = result.get_value() except: json_str = str(result) else: # Last resort json_str = str(result) try: value = json.loads(json_str) if json_str else None except json.JSONDecodeError: value = json_str # Return as string if not valid JSON coro = self.send_response(request_id, value) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"Error parsing JavaScript result: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) try: # Use newer evaluate_javascript method if available if hasattr(self.webview, 'evaluate_javascript'): self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None) else: # Fallback to deprecated method self.webview.run_javascript(script, None, js_finished, None) except Exception as e: error_msg = f"Failed to execute simple JavaScript: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) def get_inner_html(self, selector: str, request_id): script = f"document.querySelector({json.dumps(selector)})?.innerHTML || null" self.execute_javascript(script, request_id) def simulate_typing(self, selector: str, text: str, request_id: str, delay: float = 0.1): """Simulate typing into an element. This is now fully async in JS.""" if not self.webview or self.is_closing: coro = self.send_response(request_id, None, "Browser not ready") asyncio.run_coroutine_threadsafe(coro, self.loop) return # Validate inputs if not selector: coro = self.send_response(request_id, None, "Selector cannot be empty") asyncio.run_coroutine_threadsafe(coro, self.loop) return # Sanitize delay delay = max(0, min(delay, 5.0)) # Limit delay between 0 and 5 seconds # Store callback with extended timeout timeout = 30 + len(text) * delay try: with self._gtk_lock: self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=timeout) except Exception as e: logger.error(f"Error storing typing callback: {e}") coro = self.send_response(request_id, None, str(e)) asyncio.run_coroutine_threadsafe(coro, self.loop) return # Create a self-contained async function that returns a stringified result typing_script = f""" (async function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found: {selector}' }}); }} // Check if element is an input or textarea if (!['INPUT', 'TEXTAREA'].includes(element.tagName) && !element.isContentEditable) {{ return JSON.stringify({{ success: false, error: 'Element is not editable' }}); }} element.focus(); const text = {json.dumps(text)}; const delay = {delay * 1000}; // Clear existing value if (element.value !== undefined) {{ element.value = ''; }} for (const char of text) {{ if (element.value !== undefined) {{ element.value += char; }} else if (element.isContentEditable) {{ element.textContent += char; }} element.dispatchEvent(new Event('input', {{ bubbles: true }})); element.dispatchEvent(new KeyboardEvent('keydown', {{ key: char, bubbles: true }})); element.dispatchEvent(new KeyboardEvent('keyup', {{ key: char, bubbles: true }})); if (delay > 0) await new Promise(r => setTimeout(r, delay)); }} // Trigger change event at the end element.dispatchEvent(new Event('change', {{ bubbles: true }})); return JSON.stringify({{ success: true, typed: text.length }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ def typing_finished(webview, task, user_data): try: with self._gtk_lock: callback = self.pending_callbacks.pop(request_id, None) if not callback: return except Exception as e: logger.error(f"Error retrieving typing callback: {e}") return try: # Different WebKit2 versions have different APIs result = None json_str = None # Try newer API first if hasattr(webview, 'evaluate_javascript_finish'): try: result = webview.evaluate_javascript_finish(task) except Exception as e: logger.debug(f"evaluate_javascript_finish failed: {e}") result = None # Fallback to older API if result is None and hasattr(webview, 'run_javascript_finish'): try: result = webview.run_javascript_finish(task) except Exception as e: logger.debug(f"run_javascript_finish failed: {e}") result = None if result is None: value = {"success": False, "error": "Failed to get typing result"} else: # Try to extract the string value if hasattr(result, 'get_js_value'): js_result = result.get_js_value() if hasattr(js_result, 'to_string'): json_str = js_result.to_string() else: json_str = str(js_result) elif hasattr(result, 'get_value'): # Older WebKit2 API try: json_str = result.get_value() except: json_str = str(result) else: # Last resort json_str = str(result) try: value = json.loads(json_str) if json_str else None except json.JSONDecodeError: value = {"success": False, "error": f"Invalid JSON response: {json_str}"} coro = self.send_response(request_id, value) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"Error in typing finished callback: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) try: # Execute the typing script # Use newer evaluate_javascript method if available if hasattr(self.webview, 'evaluate_javascript'): self.webview.evaluate_javascript(typing_script, -1, None, None, None, typing_finished, None) else: # Fallback to deprecated method self.webview.run_javascript(typing_script, None, typing_finished, None) except Exception as e: with self._gtk_lock: self.pending_callbacks.pop(request_id, None) error_msg = f"Failed to start typing simulation: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) def execute_playwright_action(self, action: str, params: dict, request_id: str): """Execute a Playwright-style action.""" # For all Playwright actions, we use the more compatible JSON-wrapped approach # This ensures the result is always a string that can be parsed if action == "click": selector = params.get("selector") options = params.get("options", {}) script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} // Get element position const rect = element.getBoundingClientRect(); const x = rect.left + rect.width / 2; const y = rect.top + rect.height / 2; // Dispatch mouse events const mouseEventInit = {{ bubbles: true, cancelable: true, view: window, clientX: x, clientY: y }}; element.dispatchEvent(new MouseEvent('mousedown', mouseEventInit)); element.dispatchEvent(new MouseEvent('mouseup', mouseEventInit)); element.dispatchEvent(new MouseEvent('click', mouseEventInit)); // Also trigger if it's a link or button if (element.tagName === 'A' || element.tagName === 'BUTTON') {{ element.click(); }} return JSON.stringify({{ success: true }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "fill": selector = params.get("selector") value = params.get("value", "") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} element.focus(); if (element.value !== undefined) {{ element.value = {json.dumps(value)}; }} else if (element.isContentEditable) {{ element.textContent = {json.dumps(value)}; }} else {{ return JSON.stringify({{ success: false, error: 'Element is not fillable' }}); }} element.dispatchEvent(new Event('input', {{ bubbles: true }})); element.dispatchEvent(new Event('change', {{ bubbles: true }})); return JSON.stringify({{ success: true }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "hover": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} const rect = element.getBoundingClientRect(); const x = rect.left + rect.width / 2; const y = rect.top + rect.height / 2; const mouseEvent = new MouseEvent('mouseover', {{ bubbles: true, cancelable: true, view: window, clientX: x, clientY: y }}); element.dispatchEvent(mouseEvent); element.dispatchEvent(new MouseEvent('mouseenter', {{ bubbles: true }})); return JSON.stringify({{ success: true }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "focus": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} element.focus(); return JSON.stringify({{ success: true }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "check": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} if (element.type !== 'checkbox' && element.type !== 'radio') {{ return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }}); }} if (!element.checked) {{ element.checked = true; element.dispatchEvent(new Event('change', {{ bubbles: true }})); }} return JSON.stringify({{ success: true, checked: element.checked }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "uncheck": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} if (element.type !== 'checkbox' && element.type !== 'radio') {{ return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }}); }} if (element.checked) {{ element.checked = false; element.dispatchEvent(new Event('change', {{ bubbles: true }})); }} return JSON.stringify({{ success: true, checked: element.checked }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "select_option": selector = params.get("selector") values = params.get("values", []) if isinstance(values, str): values = [values] script = f""" (function() {{ try {{ const select = document.querySelector({json.dumps(selector)}); if (!select) {{ return JSON.stringify({{ success: false, error: 'Select element not found' }}); }} if (select.tagName !== 'SELECT') {{ return JSON.stringify({{ success: false, error: 'Element is not a select' }}); }} const values = {json.dumps(values)}; const selected = []; // Clear previous selections if not multiple if (!select.multiple) {{ Array.from(select.options).forEach(opt => opt.selected = false); }} // Select new values for (const value of values) {{ const option = Array.from(select.options).find( opt => opt.value === value || opt.text === value ); if (option) {{ option.selected = true; selected.push(option.value); }} }} select.dispatchEvent(new Event('change', {{ bubbles: true }})); return JSON.stringify({{ success: true, selected: selected }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "press": selector = params.get("selector") key = params.get("key") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} element.focus(); const key = {json.dumps(key)}; const keyEvent = new KeyboardEvent('keydown', {{ key: key, code: key, bubbles: true, cancelable: true }}); element.dispatchEvent(keyEvent); element.dispatchEvent(new KeyboardEvent('keyup', {{ key: key, code: key, bubbles: true, cancelable: true }})); // Handle special keys if (key === 'Enter' && (element.tagName === 'INPUT' || element.tagName === 'TEXTAREA')) {{ const form = element.form; if (form && element.tagName === 'INPUT') {{ form.dispatchEvent(new Event('submit', {{ bubbles: true, cancelable: true }})); }} }} return JSON.stringify({{ success: true }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "wait_for_selector": selector = params.get("selector") options = params.get("options", {}) timeout = options.get("timeout", 30000) state = options.get("state", "visible") # Simplified synchronous check script = f""" (function() {{ const selector = {json.dumps(selector)}; const state = {json.dumps(state)}; const element = document.querySelector(selector); if (state === 'attached') {{ return JSON.stringify({{ success: !!element, found: !!element }}); }} else if (state === 'detached') {{ return JSON.stringify({{ success: !element, found: false }}); }} else if (state === 'visible') {{ if (!element) {{ return JSON.stringify({{ success: false, found: false }}); }} const style = window.getComputedStyle(element); const isVisible = style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0'; return JSON.stringify({{ success: isVisible, found: isVisible }}); }} else if (state === 'hidden') {{ if (!element) {{ return JSON.stringify({{ success: true, found: false }}); }} const style = window.getComputedStyle(element); const isHidden = style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0'; return JSON.stringify({{ success: isHidden, found: true }}); }} return JSON.stringify({{ success: false, found: false }}); }})() """ # For now, just do a single check (polling would require more complex implementation) # In a real implementation, you might want to implement polling on the Python side self._execute_simple_js(script, request_id) elif action == "wait_for_load_state": state = params.get("state", "load") timeout = params.get("timeout", 30000) script = f""" (async function() {{ const state = {json.dumps(state)}; const timeout = {timeout}; const startTime = Date.now(); if (state === 'domcontentloaded') {{ if (document.readyState === 'loading') {{ await new Promise(resolve => {{ document.addEventListener('DOMContentLoaded', resolve); }}); }} return JSON.stringify({{ success: true }}); }} else if (state === 'load') {{ if (document.readyState !== 'complete') {{ await new Promise(resolve => {{ window.addEventListener('load', resolve); }}); }} return JSON.stringify({{ success: true }}); }} else if (state === 'networkidle') {{ // Simple network idle implementation let pendingRequests = 0; const observer = new PerformanceObserver((list) => {{ for (const entry of list.getEntries()) {{ if (entry.entryType === 'resource') {{ pendingRequests--; }} }} }}); observer.observe({{ entryTypes: ['resource'] }}); while (Date.now() - startTime < timeout) {{ if (pendingRequests <= 0) {{ observer.disconnect(); return JSON.stringify({{ success: true }}); }} await new Promise(r => setTimeout(r, 500)); }} observer.disconnect(); return JSON.stringify({{ success: false, error: 'Timeout waiting for network idle' }}); }} return JSON.stringify({{ success: true }}); }})() """ self.execute_javascript(script, request_id) elif action == "wait_for_timeout": timeout = params.get("timeout", 1000) script = f""" (async function() {{ await new Promise(r => setTimeout(r, {timeout})); return JSON.stringify({{ success: true }}); }})() """ self.execute_javascript(script, request_id) elif action == "get_attribute": selector = params.get("selector") name = params.get("name") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} const value = element.getAttribute({json.dumps(name)}); return JSON.stringify({{ success: true, value: value }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "inner_text": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} return JSON.stringify({{ success: true, text: element.innerText || '' }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "inner_html": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} return JSON.stringify({{ success: true, html: element.innerHTML || '' }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "is_visible": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: true, visible: false }}); }} const style = window.getComputedStyle(element); const isVisible = style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && element.offsetWidth > 0 && element.offsetHeight > 0; return JSON.stringify({{ success: true, visible: isVisible }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "is_enabled": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} const isEnabled = !element.disabled; return JSON.stringify({{ success: true, enabled: isEnabled }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "is_checked": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); if (!element) {{ return JSON.stringify({{ success: false, error: 'Element not found' }}); }} const isChecked = element.checked || false; return JSON.stringify({{ success: true, checked: isChecked }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "query_selector": selector = params.get("selector") script = f""" (function() {{ try {{ const element = document.querySelector({json.dumps(selector)}); return JSON.stringify({{ success: true, found: !!element }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) elif action == "query_selector_all": selector = params.get("selector") script = f""" (function() {{ try {{ const elements = document.querySelectorAll({json.dumps(selector)}); return JSON.stringify({{ success: true, count: elements.length }}); }} catch (e) {{ return JSON.stringify({{ success: false, error: e.toString() }}); }} }})() """ self._execute_simple_js(script, request_id) else: coro = self.send_response(request_id, None, f"Unknown Playwright action: {action}") asyncio.run_coroutine_threadsafe(coro, self.loop) async def handle_command(self, command: dict): """Handle commands from WebSocket client.""" cmd_type = command.get("command") request_id = command.get("request_id", str(uuid.uuid4())) if not cmd_type: await self.send_response(request_id, None, "No command specified") return if not self.is_ready and cmd_type not in ["get_info", "close"]: await self.send_response(request_id, None, "Browser not ready yet") return try: # Backward compatible commands if cmd_type == "navigate": url = command.get("url") if not url: await self.send_response(request_id, None, "URL is required") return # Basic URL validation if not url.startswith(('http://', 'https://', 'file://', 'about:')): url = 'https://' + url def safe_navigate(): try: if self.webview and not self.is_closing: self.webview.load_uri(url) except Exception as e: logger.error(f"Error navigating to {url}: {e}") GLib.idle_add(safe_navigate) await self.send_response(request_id, {"status": "navigating", "url": url}) elif cmd_type == "execute_js": script = command.get("script") if not script: await self.send_response(request_id, None, "Script is required") return GLib.idle_add(self.execute_javascript, script, request_id) elif cmd_type == "simulate_typing": selector = command.get("selector") text = command.get("text", "") delay = command.get("delay", 0.1) if not selector: await self.send_response(request_id, None, "Selector is required") return GLib.idle_add(self.simulate_typing, selector, text, request_id, delay) elif cmd_type == "go_back": def safe_go_back(): try: if self.webview and self.webview.can_go_back(): self.webview.go_back() except Exception as e: logger.error(f"Error going back: {e}") GLib.idle_add(safe_go_back) await self.send_response(request_id, {"status": "ok"}) elif cmd_type == "go_forward": def safe_go_forward(): try: if self.webview and self.webview.can_go_forward(): self.webview.go_forward() except Exception as e: logger.error(f"Error going forward: {e}") GLib.idle_add(safe_go_forward) await self.send_response(request_id, {"status": "ok"}) elif cmd_type == "reload": def safe_reload(): try: if self.webview and not self.is_closing: self.webview.reload() except Exception as e: logger.error(f"Error reloading: {e}") GLib.idle_add(safe_reload) await self.send_response(request_id, {"status": "reloading"}) elif cmd_type == "get_info": def get_browser_info(): try: info = { "is_ready": self.is_ready, "is_closing": self.is_closing } if self.webview and self.is_ready: try: info["url"] = self.webview.get_uri() except: info["url"] = None try: info["title"] = self.webview.get_title() except: info["title"] = None try: info["is_loading"] = self.webview.is_loading() except: info["is_loading"] = False try: info["can_go_back"] = self.webview.can_go_back() except: info["can_go_back"] = False try: info["can_go_forward"] = self.webview.can_go_forward() except: info["can_go_forward"] = False coro = self.send_response(request_id, info) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"Error getting browser info: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) GLib.idle_add(get_browser_info) elif cmd_type == "screenshot": try: with self._gtk_lock: self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=60) except Exception as e: await self.send_response(request_id, None, f"Error preparing screenshot: {str(e)}") return def on_snapshot(webview, task, data): try: with self._gtk_lock: callback = self.pending_callbacks.pop(request_id, None) if not callback: return except Exception as e: logger.error(f"Error retrieving screenshot callback: {e}") return try: surface = webview.get_snapshot_finish(task) # Save to bytes import io import cairo buf = io.BytesIO() surface.write_to_png(buf) buf.seek(0) screenshot_b64 = base64.b64encode(buf.read()).decode('utf-8') coro = self.send_response(request_id, {"screenshot": screenshot_b64, "format": "png"}) asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: error_msg = f"Error capturing screenshot: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) def take_snapshot(): try: if self.webview and not self.is_closing: self.webview.get_snapshot( WebKit2.SnapshotRegion.FULL_DOCUMENT, WebKit2.SnapshotOptions.NONE, None, on_snapshot, None ) else: with self._gtk_lock: self.pending_callbacks.pop(request_id, None) coro = self.send_response(request_id, None, "Browser not available") asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: with self._gtk_lock: self.pending_callbacks.pop(request_id, None) error_msg = f"Error starting screenshot: {str(e)}" logger.error(error_msg) coro = self.send_response(request_id, None, error_msg) asyncio.run_coroutine_threadsafe(coro, self.loop) GLib.idle_add(take_snapshot) elif cmd_type == "set_html": html = command.get("html", "") base_uri = command.get("base_uri") def safe_load_html(): try: if self.webview and not self.is_closing: self.webview.load_html(html, base_uri) except Exception as e: logger.error(f"Error loading HTML: {e}") GLib.idle_add(safe_load_html) await self.send_response(request_id, {"status": "html_loaded"}) # Playwright-compatible commands elif cmd_type == "goto": url = command.get("url") options = command.get("options", {}) if not url: await self.send_response(request_id, None, "URL is required") return # Basic URL validation if not url.startswith(('http://', 'https://', 'file://', 'about:')): url = 'https://' + url def safe_navigate(): try: if self.webview and not self.is_closing: self.webview.load_uri(url) except Exception as e: logger.error(f"Error navigating to {url}: {e}") GLib.idle_add(safe_navigate) await self.send_response(request_id, {"status": "navigating", "url": url}) elif cmd_type == "evaluate": expression = command.get("expression") if not expression: await self.send_response(request_id, None, "Expression is required") return GLib.idle_add(self.execute_javascript, expression, request_id) elif cmd_type == "content": script = "document.documentElement.outerHTML" GLib.idle_add(self.execute_javascript, script, request_id) elif cmd_type == "title": script = "document.title" GLib.idle_add(self.execute_javascript, script, request_id) elif cmd_type == "url": def get_url(): try: if self.webview and self.is_ready: url = self.webview.get_uri() coro = self.send_response(request_id, url) else: coro = self.send_response(request_id, None, "Browser not ready") asyncio.run_coroutine_threadsafe(coro, self.loop) except Exception as e: coro = self.send_response(request_id, None, str(e)) asyncio.run_coroutine_threadsafe(coro, self.loop) GLib.idle_add(get_url) elif cmd_type == "set_content": html = command.get("html", "") options = command.get("options", {}) def safe_load_html(): try: if self.webview and not self.is_closing: self.webview.load_html(html, None) except Exception as e: logger.error(f"Error loading HTML: {e}") GLib.idle_add(safe_load_html) await self.send_response(request_id, {"status": "html_loaded"}) elif cmd_type == "type": selector = command.get("selector") text = command.get("text", "") options = command.get("options", {}) delay = options.get("delay", 0) if not selector: await self.send_response(request_id, None, "Selector is required") return GLib.idle_add(self.simulate_typing, selector, text, request_id, delay / 1000.0) elif cmd_type == "playwright": # Handle Playwright-style actions action = command.get("action") params = command.get("params", {}) if not action: await self.send_response(request_id, None, "Action is required") return GLib.idle_add(self.execute_playwright_action, action, params, request_id) elif cmd_type == "close": await self.send_response(request_id, {"status": "closing"}) await self.close() else: await self.send_response(request_id, None, f"Unknown command: {cmd_type}") except Exception as e: error_msg = f"Error handling command {cmd_type}: {str(e)}" logger.error(f"{error_msg}\n{traceback.format_exc()}") await self.send_response(request_id, None, error_msg) async def close(self): """Close the browser window and cleanup.""" if self.is_closing: return self.is_closing = True logger.info(f"Closing browser for connection {self.connection_id}") # Cancel cleanup task if self._cleanup_task: self._cleanup_task.cancel() try: await self._cleanup_task except asyncio.CancelledError: pass # Send responses for all pending callbacks async with self._lock: for callback in list(self.pending_callbacks.values()): await self.send_response(callback.request_id, None, "Browser closing") self.pending_callbacks.clear() def close_window(): try: with self._gtk_lock: if self.window: self.window.destroy() self.window = None self.webview = None except Exception as e: logger.error(f"Error closing window: {e}") GLib.idle_add(close_window) # Remove from server connections if self.connection_id in self.server.connections: del self.server.connections[self.connection_id] class BrowserServer: """ WebSocket server that manages multiple browser instances. """ def __init__(self, host: str = "localhost", port: int = 8765, max_connections: int = 10): self.host = host self.port = port self.max_connections = max_connections self.connections: Dict[str, RemoteBrowser] = {} self.gtk_thread = None self.gtk_ready = threading.Event() self.server = None self.is_running = False self.executor = ThreadPoolExecutor(max_workers=4) self.loop: Optional[asyncio.AbstractEventLoop] = None self._shutdown_lock = threading.Lock() def start_gtk_thread(self): """Start GTK main loop in a separate thread.""" def gtk_main(): try: logger.info("Starting GTK main loop") GLib.timeout_add(100, lambda: self.gtk_ready.set() or False) Gtk.main() except Exception as e: logger.error(f"GTK main loop error: {e}") finally: logger.info("GTK main loop ended") self.gtk_thread = threading.Thread(target=gtk_main, daemon=True) self.gtk_thread.start() # Wait for GTK to be ready if not self.gtk_ready.wait(5): raise Exception("GTK initialization timeout") async def handle_connection(self, websocket, path): """Handle a new WebSocket connection.""" if len(self.connections) >= self.max_connections: await websocket.close(1008, f"Maximum connections ({self.max_connections}) reached") return connection_id = str(uuid.uuid4()) logger.info(f"New connection: {connection_id}") browser = RemoteBrowser(connection_id, websocket, self, self.loop) self.connections[connection_id] = browser try: # Start browser tasks await browser.start() # Create browser in GTK thread future = self.loop.create_future() def create_browser_wrapper(): try: browser.create_browser() if browser._creation_error: self.loop.call_soon_threadsafe(future.set_exception, Exception(browser._creation_error)) else: self.loop.call_soon_threadsafe(future.set_result, True) except Exception as e: self.loop.call_soon_threadsafe(future.set_exception, e) GLib.idle_add(create_browser_wrapper) # Wait for browser creation with timeout await asyncio.wait_for(future, timeout=15.0) # Send connection confirmation await websocket.send(json.dumps({ "type": "connected", "connection_id": connection_id, "message": "Browser window created", "webkit_version": webkit_version })) # Handle incoming messages async for message in websocket: if browser.is_closing: break request_id = None try: command = json.loads(message) request_id = command.get("request_id") await browser.handle_command(command) except json.JSONDecodeError as e: error_msg = f"Invalid JSON: {str(e)}" logger.error(f"{error_msg} - Message: {message[:100]}...") await browser.send_response(request_id, None, error_msg) except Exception as e: error_msg = f"Error handling message: {str(e)}" logger.error(f"{error_msg} for request '{request_id}'\n{traceback.format_exc()}") await browser.send_response(request_id, None, error_msg) except asyncio.TimeoutError: logger.error(f"Browser creation timeout for {connection_id}") await browser.send_event("browser_error", {"error": "Browser creation timeout"}) except websockets.exceptions.ConnectionClosed: logger.info(f"Connection closed: {connection_id}") except Exception as e: error_msg = f"Connection error {connection_id}: {str(e)}" logger.error(f"{error_msg}\n{traceback.format_exc()}") try: await browser.send_event("server_error", {"error": str(e)}) except: pass finally: logger.info(f"Cleaning up connection: {connection_id}") await browser.close() async def start_server(self): """Start the WebSocket server.""" logger.info(f"Starting WebSocket server on ws://{self.host}:{self.port}") self.server = await websockets.serve( self.handle_connection, self.host, self.port, max_size=10**7, # 10MB max message size ping_interval=20, ping_timeout=10, compression=None # Disable compression for stability ) self.is_running = True logger.info("WebSocket server started successfully") # Wait for server to close await self.server.wait_closed() async def shutdown(self): """Gracefully shutdown the server.""" with self._shutdown_lock: if not self.is_running: return logger.info("Shutting down server...") self.is_running = False # Close all browser connections browsers = list(self.connections.values()) if browsers: await asyncio.gather( *[browser.close() for browser in browsers], return_exceptions=True ) # Close WebSocket server if self.server: self.server.close() await self.server.wait_closed() # Shutdown executor self.executor.shutdown(wait=True) # Stop GTK main loop def quit_gtk(): try: Gtk.main_quit() except: pass GLib.idle_add(quit_gtk) # Wait for GTK thread to finish if self.gtk_thread and self.gtk_thread.is_alive(): self.gtk_thread.join(timeout=5) logger.info("Server shutdown complete") async def run(self): """Run the server with proper error handling.""" try: self.loop = asyncio.get_running_loop() self.start_gtk_thread() await self.start_server() except KeyboardInterrupt: logger.info("Received interrupt signal") except Exception as e: logger.error(f"Server error: {e}\n{traceback.format_exc()}") raise finally: await self.shutdown() def main(): """Main entry point with proper signal handling.""" import signal server = BrowserServer() # Setup signal handlers def signal_handler(sig, frame): logger.info(f"Received signal {sig}") asyncio.create_task(server.shutdown()) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: asyncio.run(server.run()) except KeyboardInterrupt: logger.info("Exiting.") except Exception as e: logger.error(f"Fatal error: {e}") exit(1) if __name__ == "__main__": main()