#!/usr/bin/env python3
"""
Production-ready WebSocket Browser Server with Playwright-compatible API
Manages multiple WebKit2GTK browser instances with improved concurrency.
This server is designed to work with various WebKit2 versions and automatically
adapts to the available API. It supports both the original command API and
the new Playwright-compatible API for maximum flexibility.
WebKit2 Version Compatibility:
- WebKit2 4.1: Full support with newer evaluate_javascript API
- WebKit2 4.0: Full support with run_javascript API
- WebKit2 3.0: Basic support with fallback mechanisms
The server automatically detects the available WebKit2 version and uses
appropriate methods for JavaScript execution and result extraction.
"""
import gi
gi.require_version('Gtk', '3.0')
# Attempt to load WebKit2 with version fallback
webkit_loaded = False
webkit_version = None
for version in ['4.1', '4.0', '3.0']:
try:
gi.require_version('WebKit2', version)
webkit_loaded = True
webkit_version = version
break
except ValueError:
continue
if not webkit_loaded:
print("Error: WebKit2 is not installed or available.")
print("Please install it using:")
print(" Ubuntu/Debian: sudo apt-get install gir1.2-webkit2-4.0")
print(" Fedora: sudo dnf install webkit2gtk3")
print(" Arch: sudo pacman -S webkit2gtk")
exit(1)
from gi.repository import Gtk, WebKit2, GLib, Gdk
import asyncio
import websockets
import json
import threading
import uuid
import time
import base64
import logging
from typing import Dict, Optional, Any, List
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import traceback
import weakref
import re
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
print(f"Successfully loaded WebKit2 version {webkit_version}")
@dataclass
class PendingCallback:
"""Container for pending JavaScript callbacks"""
request_id: str
timestamp: float
timeout: float = 30.0
@property
def is_expired(self) -> bool:
return time.time() - self.timestamp > self.timeout
class RemoteBrowser:
"""
A WebKit2GTK browser instance that can be controlled via WebSocket.
Each connection gets its own browser window with improved concurrency.
"""
def __init__(self, connection_id: str, websocket, server, loop: asyncio.AbstractEventLoop):
self.connection_id = connection_id
self.websocket = websocket
self.server = server
self.loop = loop # The asyncio event loop from the main thread
self.window = None
self.webview = None
self.pending_callbacks: Dict[str, PendingCallback] = {}
self.is_closing = False
self.is_ready = False
self._lock = asyncio.Lock()
self._cleanup_task = None
self._gtk_lock = threading.Lock()
self._browser_created = threading.Event()
self._creation_error = None
async def start(self):
"""Start the browser and cleanup task"""
try:
# Start cleanup task for expired callbacks
self._cleanup_task = asyncio.create_task(self._cleanup_expired_callbacks())
except Exception as e:
logger.error(f"Error starting browser tasks: {e}")
raise
async def _cleanup_expired_callbacks(self):
"""Periodically clean up expired callbacks"""
while not self.is_closing:
try:
await asyncio.sleep(5) # Check every 5 seconds
async with self._lock:
expired_ids = [
callback_id for callback_id, callback in self.pending_callbacks.items()
if callback.is_expired
]
for callback_id in expired_ids:
try:
async with self._lock:
callback = self.pending_callbacks.pop(callback_id, None)
if callback:
logger.warning(f"Command timed out for request_id: {callback.request_id}")
await self.send_response(
callback.request_id,
None,
f"Command for request_id {callback.request_id} timed out"
)
except Exception as e:
logger.error(f"Error handling expired callback {callback_id}: {e}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup task: {e}")
def create_browser(self):
"""Create the browser window in the GTK thread."""
try:
with self._gtk_lock:
if self.is_closing:
return
self.window = Gtk.Window(title=f"Remote Browser - {self.connection_id[:8]}")
self.window.set_default_size(1280, 720)
self.window.connect("destroy", self.on_window_destroy)
# Set window properties for stability
self.window.set_resizable(True)
self.window.set_deletable(True)
main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.window.add(main_box)
nav_bar = self.create_navigation_bar()
main_box.pack_start(nav_bar, False, False, 0)
self.webview = WebKit2.WebView()
if not self.setup_webview():
raise Exception("Failed to setup WebView")
scrolled_window = Gtk.ScrolledWindow()
scrolled_window.add(self.webview)
main_box.pack_start(scrolled_window, True, True, 0)
self.status_bar = Gtk.Statusbar()
self.status_context = self.status_bar.get_context_id("connection")
self.status_bar.push(self.status_context, f"Connected: {self.connection_id[:8]}")
main_box.pack_start(self.status_bar, False, False, 0)
self.window.show_all()
self.window.present()
self.is_ready = True
self._browser_created.set()
GLib.idle_add(self._notify_ready)
except Exception as e:
logger.error(f"Error creating browser window: {e}")
self._creation_error = str(e)
self._browser_created.set()
GLib.idle_add(self._notify_error, str(e))
def _notify_ready(self):
"""Notify that browser is ready (called from GTK thread)."""
if not self.is_closing:
try:
coro = self.send_event("browser_ready", {"connection_id": self.connection_id})
asyncio.run_coroutine_threadsafe(coro, self.loop)
except:
pass
def _execute_wrapped_js(self, script: str, request_id: str):
"""Execute JavaScript wrapped to always return a string for maximum compatibility."""
def js_finished(webview, task, user_data):
try:
# Try to get the result string directly
result_str = None
try:
# For very old WebKit2 versions, try the simple approach
result = webview.run_javascript_finish(task)
result_str = str(result)
# Check if the result looks like our wrapped response
if result_str and ('"success":true' in result_str or '"success":false' in result_str):
# Extract the JSON from the string representation
import re
json_match = re.search(r'\{.*\}', result_str)
if json_match:
result_str = json_match.group(0)
except Exception as e:
logger.debug(f"Failed to get result string: {e}")
if result_str:
try:
parsed = json.loads(result_str)
if parsed.get("success"):
value = parsed.get("value")
else:
value = None
error_msg = parsed.get("error", "Unknown error")
coro = self.send_response(request_id, value, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
except json.JSONDecodeError:
value = result_str
else:
value = None
coro = self.send_response(request_id, value)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"Error in wrapped JS execution: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
self.webview.run_javascript(script, None, js_finished, None)
except Exception as e:
error_msg = f"Failed to execute wrapped JavaScript: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
def _execute_wait_check(self, script: str, check_id: str, callback):
"""Execute a simple check for wait operations."""
def js_finished(webview, task, user_data):
try:
result_str = None
try:
result = webview.run_javascript_finish(task)
# Extract the JSON string from the result
if hasattr(result, 'get_js_value'):
js_value = result.get_js_value()
if hasattr(js_value, 'to_string'):
result_str = js_value.to_string()
elif hasattr(result, 'get_value'):
result_str = result.get_value()
else:
result_str = str(result)
# Clean up string representation
import re
json_match = re.search(r'\{.*\}', result_str)
if json_match:
result_str = json_match.group(0)
except Exception as e:
logger.debug(f"Failed to get wait check result: {e}")
callback(result_str)
except Exception as e:
logger.error(f"Error in wait check: {e}")
callback(None)
try:
self.webview.run_javascript(script, None, js_finished, None)
except Exception as e:
logger.error(f"Failed to execute wait check: {e}")
callback(None)
def _notify_error(self, error_msg):
"""Notify about browser creation error (called from GTK thread)."""
if not self.is_closing:
try:
coro = self.send_event("browser_error", {"error": error_msg})
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
logger.error(f"Error notifying browser error: {e}")
def create_navigation_bar(self):
"""Create a simple navigation bar."""
try:
nav_bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
nav_bar.set_margin_top(5)
nav_bar.set_margin_bottom(5)
# Use newer margin methods instead of deprecated ones
nav_bar.set_margin_start(5)
nav_bar.set_margin_end(5)
self.url_label = Gtk.Label()
self.url_label.set_ellipsize(True)
self.url_label.set_max_width_chars(50)
nav_bar.pack_start(self.url_label, True, True, 0)
return nav_bar
except Exception as e:
logger.error(f"Error creating navigation bar: {e}")
# Return minimal nav bar on error
return Gtk.Box()
def setup_webview(self) -> bool:
"""Configure WebKit view settings."""
try:
if not self.webview:
return False
settings = self.webview.get_settings()
settings.set_enable_developer_extras(True)
settings.set_enable_javascript(True)
settings.set_allow_file_access_from_file_urls(True)
settings.set_enable_webgl(True)
settings.set_enable_media_stream(True)
# Only set non-deprecated security settings
try:
# These might be available in newer versions
if hasattr(settings, 'set_enable_java'):
settings.set_enable_java(False)
if hasattr(settings, 'set_enable_plugins'):
settings.set_enable_plugins(False)
except:
# Ignore if not available
pass
# Connect signals with weak references to avoid circular references
self.webview.connect("load-changed", self.on_load_changed)
self.webview.connect("notify::uri", self.on_uri_changed)
self.webview.connect("load-failed", self.on_load_failed)
self.webview.connect("notify::title", self.on_title_changed)
return True
except Exception as e:
logger.error(f"Error setting up webview: {e}")
return False
def on_window_destroy(self, widget):
"""Handle window close event."""
if not self.is_closing:
self.is_closing = True
try:
asyncio.run_coroutine_threadsafe(self.close(), self.loop)
except Exception as e:
logger.error(f"Error scheduling close: {e}")
def on_load_changed(self, webview, load_event):
"""Handle page load events (called from GTK thread)."""
if self.is_closing:
return
try:
event_data = {}
# Safely get URI and title
try:
uri = webview.get_uri()
if uri:
event_data["url"] = uri
except:
pass
try:
title = webview.get_title()
if title:
event_data["title"] = title
except:
pass
if load_event == WebKit2.LoadEvent.FINISHED:
coro = self.send_event("load_finished", event_data)
asyncio.run_coroutine_threadsafe(coro, self.loop)
elif load_event == WebKit2.LoadEvent.STARTED:
coro = self.send_event("load_started", event_data)
asyncio.run_coroutine_threadsafe(coro, self.loop)
elif load_event == WebKit2.LoadEvent.COMMITTED:
coro = self.send_event("load_committed", event_data)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
logger.error(f"Error in load changed handler: {e}")
def on_uri_changed(self, webview, param):
"""Update URL label when URI changes."""
if self.is_closing:
return
try:
uri = webview.get_uri()
if uri and self.url_label:
self.url_label.set_text(uri)
except Exception as e:
logger.error(f"Error updating URI: {e}")
def on_title_changed(self, webview, param):
"""Handle title changes (called from GTK thread)."""
if self.is_closing:
return
try:
title = webview.get_title()
uri = None
try:
uri = webview.get_uri()
except:
pass
if title:
event_data = {"title": title}
if uri:
event_data["url"] = uri
coro = self.send_event("title_changed", event_data)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
logger.error(f"Error in title changed handler: {e}")
def on_load_failed(self, webview, load_event, failing_uri, error):
"""Handle load failures (called from GTK thread)."""
if self.is_closing:
return
try:
error_msg = error.message if error else "Unknown error"
coro = self.send_event("load_failed", {"url": failing_uri, "error": error_msg})
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
logger.error(f"Error in load failed handler: {e}")
async def send_event(self, event_type: str, data: dict):
"""Send an event to the WebSocket client."""
if self.is_closing or not self.websocket:
return
try:
async with self._lock:
if self.websocket.state == websockets.protocol.State.OPEN:
await self.websocket.send(json.dumps({
"type": "event",
"event": event_type,
"data": data,
"timestamp": time.time()
}))
except websockets.exceptions.ConnectionClosed:
self.is_closing = True
except Exception as e:
logger.error(f"Error sending event {event_type}: {e}")
async def send_response(self, request_id: str, result: Any, error: Optional[str] = None):
"""Send a response to a command."""
if self.is_closing or not self.websocket:
return
try:
async with self._lock:
if self.websocket.state == websockets.protocol.State.OPEN:
await self.websocket.send(json.dumps({
"type": "response",
"request_id": request_id,
"result": result,
"error": error,
"timestamp": time.time()
}))
except websockets.exceptions.ConnectionClosed:
self.is_closing = True
except Exception as e:
logger.error(f"Error sending response for {request_id}: {e}")
def execute_javascript(self, script: str, request_id: str):
"""Execute JavaScript and send result back via WebSocket."""
if not self.webview or self.is_closing:
coro = self.send_response(request_id, None, "Browser not ready")
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
# Store callback info with lock
try:
with self._gtk_lock:
self.pending_callbacks[request_id] = PendingCallback(request_id, time.time())
except Exception as e:
logger.error(f"Error storing callback: {e}")
coro = self.send_response(request_id, None, str(e))
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
def js_finished(webview, task, user_data):
try:
with self._gtk_lock:
callback = self.pending_callbacks.pop(request_id, None)
if not callback:
return # Timed out or already handled
except Exception as e:
logger.error(f"Error retrieving callback: {e}")
return
try:
# Different WebKit2 versions have different APIs
result = None
value = None
# Try newer API first
if hasattr(webview, 'evaluate_javascript_finish'):
try:
result = webview.evaluate_javascript_finish(task)
except Exception as e:
logger.debug(f"evaluate_javascript_finish failed: {e}")
result = None
# Fallback to older API
if result is None and hasattr(webview, 'run_javascript_finish'):
try:
result = webview.run_javascript_finish(task)
except Exception as e:
logger.debug(f"run_javascript_finish failed: {e}")
result = None
if result is None:
# If both methods failed, try to return a simple result
# This might happen with very old WebKit2 versions
logger.info("Falling back to JSON string execution for compatibility")
# Wrap the script to always return a JSON string
wrapped_script = f"""
(function() {{
try {{
var result = ({script});
return JSON.stringify({{success: true, value: result}});
}} catch (e) {{
return JSON.stringify({{success: false, error: e.toString()}});
}}
}})()
"""
self._execute_wrapped_js(wrapped_script, request_id)
return
# Try to extract the value from the result
# Different WebKit2 versions use different methods
if hasattr(result, 'get_js_value'):
js_result = result.get_js_value()
if hasattr(js_result, 'is_string') and js_result.is_string():
value = js_result.to_string()
elif hasattr(js_result, 'is_number') and js_result.is_number():
value = js_result.to_double()
elif hasattr(js_result, 'is_boolean') and js_result.is_boolean():
value = js_result.to_boolean()
elif hasattr(js_result, 'is_object') and (js_result.is_object() or js_result.is_array()):
# Handle JSON serialization
json_script = f"JSON.stringify(({script}))"
self._execute_simple_js(json_script, request_id)
return
else:
value = None
elif hasattr(result, 'get_value'):
# Older WebKit2 API
try:
value = result.get_value()
except:
# If get_value fails, try JSON serialization
json_script = f"JSON.stringify(({script}))"
self._execute_simple_js(json_script, request_id)
return
else:
# Last resort: try JSON serialization
json_script = f"JSON.stringify(({script}))"
self._execute_simple_js(json_script, request_id)
return
coro = self.send_response(request_id, value)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"JavaScript execution error: {str(e)}"
logger.error(f"{error_msg} - Script: {script[:100]}...")
# Try simple JSON serialization as fallback
try:
json_script = f"JSON.stringify(({script}))"
self._execute_simple_js(json_script, request_id)
except:
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
# Use newer evaluate_javascript method if available
if hasattr(self.webview, 'evaluate_javascript'):
self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None)
else:
# Fallback to deprecated method
self.webview.run_javascript(script, None, js_finished, None)
except Exception as e:
with self._gtk_lock:
self.pending_callbacks.pop(request_id, None)
error_msg = f"Failed to execute JavaScript: {str(e)}"
logger.error(f"{error_msg} - Script: {script[:100]}...")
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
def _execute_simple_js(self, script: str, request_id: str):
"""Execute simple JavaScript that returns a string result."""
def js_finished(webview, task, user_data):
try:
# Different WebKit2 versions have different APIs
result = None
json_str = None
# Try newer API first
if hasattr(webview, 'evaluate_javascript_finish'):
try:
result = webview.evaluate_javascript_finish(task)
except Exception as e:
logger.debug(f"evaluate_javascript_finish failed: {e}")
result = None
# Fallback to older API
if result is None and hasattr(webview, 'run_javascript_finish'):
try:
result = webview.run_javascript_finish(task)
except Exception as e:
logger.debug(f"run_javascript_finish failed: {e}")
result = None
if result is None:
error_msg = "Failed to get JavaScript result"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
# Try to extract the string value
if hasattr(result, 'get_js_value'):
js_result = result.get_js_value()
if hasattr(js_result, 'to_string'):
json_str = js_result.to_string()
else:
json_str = str(js_result)
elif hasattr(result, 'get_value'):
# Older WebKit2 API
try:
json_str = result.get_value()
except:
json_str = str(result)
else:
# Last resort
json_str = str(result)
try:
value = json.loads(json_str) if json_str else None
except json.JSONDecodeError:
value = json_str # Return as string if not valid JSON
coro = self.send_response(request_id, value)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"Error parsing JavaScript result: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
# Use newer evaluate_javascript method if available
if hasattr(self.webview, 'evaluate_javascript'):
self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None)
else:
# Fallback to deprecated method
self.webview.run_javascript(script, None, js_finished, None)
except Exception as e:
error_msg = f"Failed to execute simple JavaScript: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
def get_inner_html(self, selector: str, request_id):
script = f"document.querySelector({json.dumps(selector)})?.innerHTML || null"
self.execute_javascript(script, request_id)
def simulate_typing(self, selector: str, text: str, request_id: str, delay: float = 0.1):
"""Simulate typing into an element. This is now fully async in JS."""
if not self.webview or self.is_closing:
coro = self.send_response(request_id, None, "Browser not ready")
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
# Validate inputs
if not selector:
coro = self.send_response(request_id, None, "Selector cannot be empty")
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
# Sanitize delay
delay = max(0, min(delay, 5.0)) # Limit delay between 0 and 5 seconds
# Store callback with extended timeout
timeout = 30 + len(text) * delay
try:
with self._gtk_lock:
self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=timeout)
except Exception as e:
logger.error(f"Error storing typing callback: {e}")
coro = self.send_response(request_id, None, str(e))
asyncio.run_coroutine_threadsafe(coro, self.loop)
return
# Create a self-contained async function that returns a stringified result
typing_script = f"""
(async function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found: {selector}' }});
}}
// Check if element is an input or textarea
if (!['INPUT', 'TEXTAREA'].includes(element.tagName) && !element.isContentEditable) {{
return JSON.stringify({{ success: false, error: 'Element is not editable' }});
}}
element.focus();
const text = {json.dumps(text)};
const delay = {delay * 1000};
// Clear existing value
if (element.value !== undefined) {{
element.value = '';
}}
for (const char of text) {{
if (element.value !== undefined) {{
element.value += char;
}} else if (element.isContentEditable) {{
element.textContent += char;
}}
element.dispatchEvent(new Event('input', {{ bubbles: true }}));
element.dispatchEvent(new KeyboardEvent('keydown', {{ key: char, bubbles: true }}));
element.dispatchEvent(new KeyboardEvent('keyup', {{ key: char, bubbles: true }}));
if (delay > 0) await new Promise(r => setTimeout(r, delay));
}}
// Trigger change event at the end
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
return JSON.stringify({{ success: true, typed: text.length }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
def typing_finished(webview, task, user_data):
try:
with self._gtk_lock:
callback = self.pending_callbacks.pop(request_id, None)
if not callback:
return
except Exception as e:
logger.error(f"Error retrieving typing callback: {e}")
return
try:
# Different WebKit2 versions have different APIs
result = None
json_str = None
# Try newer API first
if hasattr(webview, 'evaluate_javascript_finish'):
try:
result = webview.evaluate_javascript_finish(task)
except Exception as e:
logger.debug(f"evaluate_javascript_finish failed: {e}")
result = None
# Fallback to older API
if result is None and hasattr(webview, 'run_javascript_finish'):
try:
result = webview.run_javascript_finish(task)
except Exception as e:
logger.debug(f"run_javascript_finish failed: {e}")
result = None
if result is None:
value = {"success": False, "error": "Failed to get typing result"}
else:
# Try to extract the string value
if hasattr(result, 'get_js_value'):
js_result = result.get_js_value()
if hasattr(js_result, 'to_string'):
json_str = js_result.to_string()
else:
json_str = str(js_result)
elif hasattr(result, 'get_value'):
# Older WebKit2 API
try:
json_str = result.get_value()
except:
json_str = str(result)
else:
# Last resort
json_str = str(result)
try:
value = json.loads(json_str) if json_str else None
except json.JSONDecodeError:
value = {"success": False, "error": f"Invalid JSON response: {json_str}"}
coro = self.send_response(request_id, value)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"Error in typing finished callback: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
try:
# Execute the typing script
# Use newer evaluate_javascript method if available
if hasattr(self.webview, 'evaluate_javascript'):
self.webview.evaluate_javascript(typing_script, -1, None, None, None, typing_finished, None)
else:
# Fallback to deprecated method
self.webview.run_javascript(typing_script, None, typing_finished, None)
except Exception as e:
with self._gtk_lock:
self.pending_callbacks.pop(request_id, None)
error_msg = f"Failed to start typing simulation: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
def execute_playwright_action(self, action: str, params: dict, request_id: str):
"""Execute a Playwright-style action."""
# For all Playwright actions, we use the more compatible JSON-wrapped approach
# This ensures the result is always a string that can be parsed
if action == "click":
selector = params.get("selector")
options = params.get("options", {})
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
// Get element position
const rect = element.getBoundingClientRect();
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
// Dispatch mouse events
const mouseEventInit = {{
bubbles: true,
cancelable: true,
view: window,
clientX: x,
clientY: y
}};
element.dispatchEvent(new MouseEvent('mousedown', mouseEventInit));
element.dispatchEvent(new MouseEvent('mouseup', mouseEventInit));
element.dispatchEvent(new MouseEvent('click', mouseEventInit));
// Also trigger if it's a link or button
if (element.tagName === 'A' || element.tagName === 'BUTTON') {{
element.click();
}}
return JSON.stringify({{ success: true }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "fill":
selector = params.get("selector")
value = params.get("value", "")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
element.focus();
if (element.value !== undefined) {{
element.value = {json.dumps(value)};
}} else if (element.isContentEditable) {{
element.textContent = {json.dumps(value)};
}} else {{
return JSON.stringify({{ success: false, error: 'Element is not fillable' }});
}}
element.dispatchEvent(new Event('input', {{ bubbles: true }}));
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
return JSON.stringify({{ success: true }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "hover":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
const rect = element.getBoundingClientRect();
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const mouseEvent = new MouseEvent('mouseover', {{
bubbles: true,
cancelable: true,
view: window,
clientX: x,
clientY: y
}});
element.dispatchEvent(mouseEvent);
element.dispatchEvent(new MouseEvent('mouseenter', {{ bubbles: true }}));
return JSON.stringify({{ success: true }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "focus":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
element.focus();
return JSON.stringify({{ success: true }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "check":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
if (element.type !== 'checkbox' && element.type !== 'radio') {{
return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }});
}}
if (!element.checked) {{
element.checked = true;
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
return JSON.stringify({{ success: true, checked: element.checked }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "uncheck":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
if (element.type !== 'checkbox' && element.type !== 'radio') {{
return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }});
}}
if (element.checked) {{
element.checked = false;
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
}}
return JSON.stringify({{ success: true, checked: element.checked }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "select_option":
selector = params.get("selector")
values = params.get("values", [])
if isinstance(values, str):
values = [values]
script = f"""
(function() {{
try {{
const select = document.querySelector({json.dumps(selector)});
if (!select) {{
return JSON.stringify({{ success: false, error: 'Select element not found' }});
}}
if (select.tagName !== 'SELECT') {{
return JSON.stringify({{ success: false, error: 'Element is not a select' }});
}}
const values = {json.dumps(values)};
const selected = [];
// Clear previous selections if not multiple
if (!select.multiple) {{
Array.from(select.options).forEach(opt => opt.selected = false);
}}
// Select new values
for (const value of values) {{
const option = Array.from(select.options).find(
opt => opt.value === value || opt.text === value
);
if (option) {{
option.selected = true;
selected.push(option.value);
}}
}}
select.dispatchEvent(new Event('change', {{ bubbles: true }}));
return JSON.stringify({{ success: true, selected: selected }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "press":
selector = params.get("selector")
key = params.get("key")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
element.focus();
const key = {json.dumps(key)};
const keyEvent = new KeyboardEvent('keydown', {{
key: key,
code: key,
bubbles: true,
cancelable: true
}});
element.dispatchEvent(keyEvent);
element.dispatchEvent(new KeyboardEvent('keyup', {{
key: key,
code: key,
bubbles: true,
cancelable: true
}}));
// Handle special keys
if (key === 'Enter' && (element.tagName === 'INPUT' || element.tagName === 'TEXTAREA')) {{
const form = element.form;
if (form && element.tagName === 'INPUT') {{
form.dispatchEvent(new Event('submit', {{ bubbles: true, cancelable: true }}));
}}
}}
return JSON.stringify({{ success: true }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "wait_for_selector":
selector = params.get("selector")
options = params.get("options", {})
timeout = options.get("timeout", 30000)
state = options.get("state", "visible")
# Simplified synchronous check
script = f"""
(function() {{
const selector = {json.dumps(selector)};
const state = {json.dumps(state)};
const element = document.querySelector(selector);
if (state === 'attached') {{
return JSON.stringify({{ success: !!element, found: !!element }});
}} else if (state === 'detached') {{
return JSON.stringify({{ success: !element, found: false }});
}} else if (state === 'visible') {{
if (!element) {{
return JSON.stringify({{ success: false, found: false }});
}}
const style = window.getComputedStyle(element);
const isVisible = style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0';
return JSON.stringify({{ success: isVisible, found: isVisible }});
}} else if (state === 'hidden') {{
if (!element) {{
return JSON.stringify({{ success: true, found: false }});
}}
const style = window.getComputedStyle(element);
const isHidden = style.display === 'none' ||
style.visibility === 'hidden' ||
style.opacity === '0';
return JSON.stringify({{ success: isHidden, found: true }});
}}
return JSON.stringify({{ success: false, found: false }});
}})()
"""
# For now, just do a single check (polling would require more complex implementation)
# In a real implementation, you might want to implement polling on the Python side
self._execute_simple_js(script, request_id)
elif action == "wait_for_load_state":
state = params.get("state", "load")
timeout = params.get("timeout", 30000)
script = f"""
(async function() {{
const state = {json.dumps(state)};
const timeout = {timeout};
const startTime = Date.now();
if (state === 'domcontentloaded') {{
if (document.readyState === 'loading') {{
await new Promise(resolve => {{
document.addEventListener('DOMContentLoaded', resolve);
}});
}}
return JSON.stringify({{ success: true }});
}} else if (state === 'load') {{
if (document.readyState !== 'complete') {{
await new Promise(resolve => {{
window.addEventListener('load', resolve);
}});
}}
return JSON.stringify({{ success: true }});
}} else if (state === 'networkidle') {{
// Simple network idle implementation
let pendingRequests = 0;
const observer = new PerformanceObserver((list) => {{
for (const entry of list.getEntries()) {{
if (entry.entryType === 'resource') {{
pendingRequests--;
}}
}}
}});
observer.observe({{ entryTypes: ['resource'] }});
while (Date.now() - startTime < timeout) {{
if (pendingRequests <= 0) {{
observer.disconnect();
return JSON.stringify({{ success: true }});
}}
await new Promise(r => setTimeout(r, 500));
}}
observer.disconnect();
return JSON.stringify({{ success: false, error: 'Timeout waiting for network idle' }});
}}
return JSON.stringify({{ success: true }});
}})()
"""
self.execute_javascript(script, request_id)
elif action == "wait_for_timeout":
timeout = params.get("timeout", 1000)
script = f"""
(async function() {{
await new Promise(r => setTimeout(r, {timeout}));
return JSON.stringify({{ success: true }});
}})()
"""
self.execute_javascript(script, request_id)
elif action == "get_attribute":
selector = params.get("selector")
name = params.get("name")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
const value = element.getAttribute({json.dumps(name)});
return JSON.stringify({{ success: true, value: value }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "inner_text":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
return JSON.stringify({{ success: true, text: element.innerText || '' }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "inner_html":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
return JSON.stringify({{ success: true, html: element.innerHTML || '' }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "is_visible":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: true, visible: false }});
}}
const style = window.getComputedStyle(element);
const isVisible = style.display !== 'none' &&
style.visibility !== 'hidden' &&
style.opacity !== '0' &&
element.offsetWidth > 0 &&
element.offsetHeight > 0;
return JSON.stringify({{ success: true, visible: isVisible }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "is_enabled":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
const isEnabled = !element.disabled;
return JSON.stringify({{ success: true, enabled: isEnabled }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "is_checked":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
if (!element) {{
return JSON.stringify({{ success: false, error: 'Element not found' }});
}}
const isChecked = element.checked || false;
return JSON.stringify({{ success: true, checked: isChecked }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "query_selector":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const element = document.querySelector({json.dumps(selector)});
return JSON.stringify({{ success: true, found: !!element }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
elif action == "query_selector_all":
selector = params.get("selector")
script = f"""
(function() {{
try {{
const elements = document.querySelectorAll({json.dumps(selector)});
return JSON.stringify({{ success: true, count: elements.length }});
}} catch (e) {{
return JSON.stringify({{ success: false, error: e.toString() }});
}}
}})()
"""
self._execute_simple_js(script, request_id)
else:
coro = self.send_response(request_id, None, f"Unknown Playwright action: {action}")
asyncio.run_coroutine_threadsafe(coro, self.loop)
async def handle_command(self, command: dict):
"""Handle commands from WebSocket client."""
cmd_type = command.get("command")
request_id = command.get("request_id", str(uuid.uuid4()))
if not cmd_type:
await self.send_response(request_id, None, "No command specified")
return
if not self.is_ready and cmd_type not in ["get_info", "close"]:
await self.send_response(request_id, None, "Browser not ready yet")
return
try:
# Backward compatible commands
if cmd_type == "navigate":
url = command.get("url")
if not url:
await self.send_response(request_id, None, "URL is required")
return
# Basic URL validation
if not url.startswith(('http://', 'https://', 'file://', 'about:')):
url = 'https://' + url
def safe_navigate():
try:
if self.webview and not self.is_closing:
self.webview.load_uri(url)
except Exception as e:
logger.error(f"Error navigating to {url}: {e}")
GLib.idle_add(safe_navigate)
await self.send_response(request_id, {"status": "navigating", "url": url})
elif cmd_type == "execute_js":
script = command.get("script")
if not script:
await self.send_response(request_id, None, "Script is required")
return
GLib.idle_add(self.execute_javascript, script, request_id)
elif cmd_type == "simulate_typing":
selector = command.get("selector")
text = command.get("text", "")
delay = command.get("delay", 0.1)
if not selector:
await self.send_response(request_id, None, "Selector is required")
return
GLib.idle_add(self.simulate_typing, selector, text, request_id, delay)
elif cmd_type == "go_back":
def safe_go_back():
try:
if self.webview and self.webview.can_go_back():
self.webview.go_back()
except Exception as e:
logger.error(f"Error going back: {e}")
GLib.idle_add(safe_go_back)
await self.send_response(request_id, {"status": "ok"})
elif cmd_type == "go_forward":
def safe_go_forward():
try:
if self.webview and self.webview.can_go_forward():
self.webview.go_forward()
except Exception as e:
logger.error(f"Error going forward: {e}")
GLib.idle_add(safe_go_forward)
await self.send_response(request_id, {"status": "ok"})
elif cmd_type == "reload":
def safe_reload():
try:
if self.webview and not self.is_closing:
self.webview.reload()
except Exception as e:
logger.error(f"Error reloading: {e}")
GLib.idle_add(safe_reload)
await self.send_response(request_id, {"status": "reloading"})
elif cmd_type == "get_info":
def get_browser_info():
try:
info = {
"is_ready": self.is_ready,
"is_closing": self.is_closing
}
if self.webview and self.is_ready:
try:
info["url"] = self.webview.get_uri()
except:
info["url"] = None
try:
info["title"] = self.webview.get_title()
except:
info["title"] = None
try:
info["is_loading"] = self.webview.is_loading()
except:
info["is_loading"] = False
try:
info["can_go_back"] = self.webview.can_go_back()
except:
info["can_go_back"] = False
try:
info["can_go_forward"] = self.webview.can_go_forward()
except:
info["can_go_forward"] = False
coro = self.send_response(request_id, info)
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"Error getting browser info: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
GLib.idle_add(get_browser_info)
elif cmd_type == "screenshot":
try:
with self._gtk_lock:
self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=60)
except Exception as e:
await self.send_response(request_id, None, f"Error preparing screenshot: {str(e)}")
return
def on_snapshot(webview, task, data):
try:
with self._gtk_lock:
callback = self.pending_callbacks.pop(request_id, None)
if not callback:
return
except Exception as e:
logger.error(f"Error retrieving screenshot callback: {e}")
return
try:
surface = webview.get_snapshot_finish(task)
# Save to bytes
import io
import cairo
buf = io.BytesIO()
surface.write_to_png(buf)
buf.seek(0)
screenshot_b64 = base64.b64encode(buf.read()).decode('utf-8')
coro = self.send_response(request_id, {"screenshot": screenshot_b64, "format": "png"})
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
error_msg = f"Error capturing screenshot: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
def take_snapshot():
try:
if self.webview and not self.is_closing:
self.webview.get_snapshot(
WebKit2.SnapshotRegion.FULL_DOCUMENT,
WebKit2.SnapshotOptions.NONE,
None,
on_snapshot,
None
)
else:
with self._gtk_lock:
self.pending_callbacks.pop(request_id, None)
coro = self.send_response(request_id, None, "Browser not available")
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
with self._gtk_lock:
self.pending_callbacks.pop(request_id, None)
error_msg = f"Error starting screenshot: {str(e)}"
logger.error(error_msg)
coro = self.send_response(request_id, None, error_msg)
asyncio.run_coroutine_threadsafe(coro, self.loop)
GLib.idle_add(take_snapshot)
elif cmd_type == "set_html":
html = command.get("html", "")
base_uri = command.get("base_uri")
def safe_load_html():
try:
if self.webview and not self.is_closing:
self.webview.load_html(html, base_uri)
except Exception as e:
logger.error(f"Error loading HTML: {e}")
GLib.idle_add(safe_load_html)
await self.send_response(request_id, {"status": "html_loaded"})
# Playwright-compatible commands
elif cmd_type == "goto":
url = command.get("url")
options = command.get("options", {})
if not url:
await self.send_response(request_id, None, "URL is required")
return
# Basic URL validation
if not url.startswith(('http://', 'https://', 'file://', 'about:')):
url = 'https://' + url
def safe_navigate():
try:
if self.webview and not self.is_closing:
self.webview.load_uri(url)
except Exception as e:
logger.error(f"Error navigating to {url}: {e}")
GLib.idle_add(safe_navigate)
await self.send_response(request_id, {"status": "navigating", "url": url})
elif cmd_type == "evaluate":
expression = command.get("expression")
if not expression:
await self.send_response(request_id, None, "Expression is required")
return
GLib.idle_add(self.execute_javascript, expression, request_id)
elif cmd_type == "content":
script = "document.documentElement.outerHTML"
GLib.idle_add(self.execute_javascript, script, request_id)
elif cmd_type == "title":
script = "document.title"
GLib.idle_add(self.execute_javascript, script, request_id)
elif cmd_type == "url":
def get_url():
try:
if self.webview and self.is_ready:
url = self.webview.get_uri()
coro = self.send_response(request_id, url)
else:
coro = self.send_response(request_id, None, "Browser not ready")
asyncio.run_coroutine_threadsafe(coro, self.loop)
except Exception as e:
coro = self.send_response(request_id, None, str(e))
asyncio.run_coroutine_threadsafe(coro, self.loop)
GLib.idle_add(get_url)
elif cmd_type == "set_content":
html = command.get("html", "")
options = command.get("options", {})
def safe_load_html():
try:
if self.webview and not self.is_closing:
self.webview.load_html(html, None)
except Exception as e:
logger.error(f"Error loading HTML: {e}")
GLib.idle_add(safe_load_html)
await self.send_response(request_id, {"status": "html_loaded"})
elif cmd_type == "type":
selector = command.get("selector")
text = command.get("text", "")
options = command.get("options", {})
delay = options.get("delay", 0)
if not selector:
await self.send_response(request_id, None, "Selector is required")
return
GLib.idle_add(self.simulate_typing, selector, text, request_id, delay / 1000.0)
elif cmd_type == "playwright":
# Handle Playwright-style actions
action = command.get("action")
params = command.get("params", {})
if not action:
await self.send_response(request_id, None, "Action is required")
return
GLib.idle_add(self.execute_playwright_action, action, params, request_id)
elif cmd_type == "close":
await self.send_response(request_id, {"status": "closing"})
await self.close()
else:
await self.send_response(request_id, None, f"Unknown command: {cmd_type}")
except Exception as e:
error_msg = f"Error handling command {cmd_type}: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
await self.send_response(request_id, None, error_msg)
async def close(self):
"""Close the browser window and cleanup."""
if self.is_closing:
return
self.is_closing = True
logger.info(f"Closing browser for connection {self.connection_id}")
# Cancel cleanup task
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Send responses for all pending callbacks
async with self._lock:
for callback in list(self.pending_callbacks.values()):
await self.send_response(callback.request_id, None, "Browser closing")
self.pending_callbacks.clear()
def close_window():
try:
with self._gtk_lock:
if self.window:
self.window.destroy()
self.window = None
self.webview = None
except Exception as e:
logger.error(f"Error closing window: {e}")
GLib.idle_add(close_window)
# Remove from server connections
if self.connection_id in self.server.connections:
del self.server.connections[self.connection_id]
class BrowserServer:
"""
WebSocket server that manages multiple browser instances.
"""
def __init__(self, host: str = "localhost", port: int = 8765, max_connections: int = 10):
self.host = host
self.port = port
self.max_connections = max_connections
self.connections: Dict[str, RemoteBrowser] = {}
self.gtk_thread = None
self.gtk_ready = threading.Event()
self.server = None
self.is_running = False
self.executor = ThreadPoolExecutor(max_workers=4)
self.loop: Optional[asyncio.AbstractEventLoop] = None
self._shutdown_lock = threading.Lock()
def start_gtk_thread(self):
"""Start GTK main loop in a separate thread."""
def gtk_main():
try:
logger.info("Starting GTK main loop")
GLib.timeout_add(100, lambda: self.gtk_ready.set() or False)
Gtk.main()
except Exception as e:
logger.error(f"GTK main loop error: {e}")
finally:
logger.info("GTK main loop ended")
self.gtk_thread = threading.Thread(target=gtk_main, daemon=True)
self.gtk_thread.start()
# Wait for GTK to be ready
if not self.gtk_ready.wait(5):
raise Exception("GTK initialization timeout")
async def handle_connection(self, websocket, path):
"""Handle a new WebSocket connection."""
if len(self.connections) >= self.max_connections:
await websocket.close(1008, f"Maximum connections ({self.max_connections}) reached")
return
connection_id = str(uuid.uuid4())
logger.info(f"New connection: {connection_id}")
browser = RemoteBrowser(connection_id, websocket, self, self.loop)
self.connections[connection_id] = browser
try:
# Start browser tasks
await browser.start()
# Create browser in GTK thread
future = self.loop.create_future()
def create_browser_wrapper():
try:
browser.create_browser()
if browser._creation_error:
self.loop.call_soon_threadsafe(future.set_exception,
Exception(browser._creation_error))
else:
self.loop.call_soon_threadsafe(future.set_result, True)
except Exception as e:
self.loop.call_soon_threadsafe(future.set_exception, e)
GLib.idle_add(create_browser_wrapper)
# Wait for browser creation with timeout
await asyncio.wait_for(future, timeout=15.0)
# Send connection confirmation
await websocket.send(json.dumps({
"type": "connected",
"connection_id": connection_id,
"message": "Browser window created",
"webkit_version": webkit_version
}))
# Handle incoming messages
async for message in websocket:
if browser.is_closing:
break
request_id = None
try:
command = json.loads(message)
request_id = command.get("request_id")
await browser.handle_command(command)
except json.JSONDecodeError as e:
error_msg = f"Invalid JSON: {str(e)}"
logger.error(f"{error_msg} - Message: {message[:100]}...")
await browser.send_response(request_id, None, error_msg)
except Exception as e:
error_msg = f"Error handling message: {str(e)}"
logger.error(f"{error_msg} for request '{request_id}'\n{traceback.format_exc()}")
await browser.send_response(request_id, None, error_msg)
except asyncio.TimeoutError:
logger.error(f"Browser creation timeout for {connection_id}")
await browser.send_event("browser_error", {"error": "Browser creation timeout"})
except websockets.exceptions.ConnectionClosed:
logger.info(f"Connection closed: {connection_id}")
except Exception as e:
error_msg = f"Connection error {connection_id}: {str(e)}"
logger.error(f"{error_msg}\n{traceback.format_exc()}")
try:
await browser.send_event("server_error", {"error": str(e)})
except:
pass
finally:
logger.info(f"Cleaning up connection: {connection_id}")
await browser.close()
async def start_server(self):
"""Start the WebSocket server."""
logger.info(f"Starting WebSocket server on ws://{self.host}:{self.port}")
self.server = await websockets.serve(
self.handle_connection,
self.host,
self.port,
max_size=10**7, # 10MB max message size
ping_interval=20,
ping_timeout=10,
compression=None # Disable compression for stability
)
self.is_running = True
logger.info("WebSocket server started successfully")
# Wait for server to close
await self.server.wait_closed()
async def shutdown(self):
"""Gracefully shutdown the server."""
with self._shutdown_lock:
if not self.is_running:
return
logger.info("Shutting down server...")
self.is_running = False
# Close all browser connections
browsers = list(self.connections.values())
if browsers:
await asyncio.gather(
*[browser.close() for browser in browsers],
return_exceptions=True
)
# Close WebSocket server
if self.server:
self.server.close()
await self.server.wait_closed()
# Shutdown executor
self.executor.shutdown(wait=True)
# Stop GTK main loop
def quit_gtk():
try:
Gtk.main_quit()
except:
pass
GLib.idle_add(quit_gtk)
# Wait for GTK thread to finish
if self.gtk_thread and self.gtk_thread.is_alive():
self.gtk_thread.join(timeout=5)
logger.info("Server shutdown complete")
async def run(self):
"""Run the server with proper error handling."""
try:
self.loop = asyncio.get_running_loop()
self.start_gtk_thread()
await self.start_server()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
except Exception as e:
logger.error(f"Server error: {e}\n{traceback.format_exc()}")
raise
finally:
await self.shutdown()
def main():
"""Main entry point with proper signal handling."""
import signal
server = BrowserServer()
# Setup signal handlers
def signal_handler(sig, frame):
logger.info(f"Received signal {sig}")
asyncio.create_task(server.shutdown())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
logger.info("Exiting.")
except Exception as e:
logger.error(f"Fatal error: {e}")
exit(1)
if __name__ == "__main__":
main()