diff --git a/Makefile b/Makefile
index 2334469..d9be655 100644
--- a/Makefile
+++ b/Makefile
@@ -1,2 +1,6 @@
install:
- sudo apt install python3-gi python3-gi-cairo gir1.2-gtk-3.0 gir1.2-webkit2-4.0
+ sudo apt install python3-gi python3-gi-cairo gir1.2-gtk-3.0
+ sudo apt install libgirepository1.0-dev
+ sudo apt install libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev pkg-config python3-dev
+
+
diff --git a/client.py b/client.py
index 26c4552..2f7bb6e 100644
--- a/client.py
+++ b/client.py
@@ -1,243 +1,701 @@
+#!/usr/bin/env python3
+"""
+Playwright-compatible WebSocket Browser Client
+Provides a Playwright-like API for controlling remote browsers via WebSocket.
+"""
+
import asyncio
import websockets
import json
+import uuid
+import time
import base64
-from typing import Optional, Dict, Any
+import logging
+from typing import Optional, Dict, Any, List, Callable, Union
+from dataclasses import dataclass
+from contextlib import asynccontextmanager
+import weakref
-class BrowserClient:
- """Client for controlling remote browser instances via WebSocket."""
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class TimeoutError(Exception):
+ """Raised when an operation times out"""
+ pass
+
+class BrowserError(Exception):
+ """Raised when a browser operation fails"""
+ pass
+
+@dataclass
+class ElementHandle:
+ """Represents a handle to a DOM element"""
+ page: 'Page'
+ selector: str
- def __init__(self, uri: str = "ws://localhost:8765"):
- self.uri = uri
- self.websocket = None
- self.connection_id = None
- self.request_counter = 0
- self.pending_responses = {}
+ async def click(self, **options):
+ """Click the element"""
+ return await self.page._playwright_action("click", {"selector": self.selector, "options": options})
+
+ async def fill(self, value: str):
+ """Fill the element with text"""
+ return await self.page._playwright_action("fill", {"selector": self.selector, "value": value})
+
+ async def type(self, text: str, delay: int = 0):
+ """Type text into the element"""
+ return await self.page.type(self.selector, text, delay=delay)
+
+ async def press(self, key: str):
+ """Press a key"""
+ return await self.page._playwright_action("press", {"selector": self.selector, "key": key})
+
+ async def hover(self):
+ """Hover over the element"""
+ return await self.page._playwright_action("hover", {"selector": self.selector})
+
+ async def focus(self):
+ """Focus the element"""
+ return await self.page._playwright_action("focus", {"selector": self.selector})
+
+ async def get_attribute(self, name: str) -> Optional[str]:
+ """Get element attribute"""
+ result = await self.page._playwright_action("get_attribute", {"selector": self.selector, "name": name})
+ return result.get("value") if result.get("success") else None
+
+ async def inner_text(self) -> str:
+ """Get inner text"""
+ result = await self.page._playwright_action("inner_text", {"selector": self.selector})
+ return result.get("text", "") if result.get("success") else ""
+
+ async def inner_html(self) -> str:
+ """Get inner HTML"""
+ result = await self.page._playwright_action("inner_html", {"selector": self.selector})
+ return result.get("html", "") if result.get("success") else ""
+
+ async def is_visible(self) -> bool:
+ """Check if element is visible"""
+ result = await self.page._playwright_action("is_visible", {"selector": self.selector})
+ return result.get("visible", False) if result.get("success") else False
+
+ async def is_enabled(self) -> bool:
+ """Check if element is enabled"""
+ result = await self.page._playwright_action("is_enabled", {"selector": self.selector})
+ return result.get("enabled", False) if result.get("success") else False
+
+ async def is_checked(self) -> bool:
+ """Check if element is checked"""
+ result = await self.page._playwright_action("is_checked", {"selector": self.selector})
+ return result.get("checked", False) if result.get("success") else False
+
+ async def check(self):
+ """Check a checkbox or radio button"""
+ return await self.page._playwright_action("check", {"selector": self.selector})
+
+ async def uncheck(self):
+ """Uncheck a checkbox"""
+ return await self.page._playwright_action("uncheck", {"selector": self.selector})
+
+ async def select_option(self, values: Union[str, List[str]]):
+ """Select option(s) in a select element"""
+ return await self.page._playwright_action("select_option", {"selector": self.selector, "values": values})
+
+class Locator:
+ """Playwright-style locator for finding elements"""
+
+ def __init__(self, page: 'Page', selector: str):
+ self.page = page
+ self.selector = selector
+
+ async def click(self, **options):
+ """Click the first matching element"""
+ return await self.page.click(self.selector, **options)
+
+ async def fill(self, value: str):
+ """Fill the first matching element"""
+ return await self.page.fill(self.selector, value)
+
+ async def type(self, text: str, delay: int = 0):
+ """Type into the first matching element"""
+ return await self.page.type(self.selector, text, delay=delay)
+
+ async def press(self, key: str):
+ """Press a key on the first matching element"""
+ return await self.page.press(self.selector, key)
+
+ async def hover(self):
+ """Hover over the first matching element"""
+ return await self.page.hover(self.selector)
+
+ async def focus(self):
+ """Focus the first matching element"""
+ return await self.page.focus(self.selector)
+
+ async def get_attribute(self, name: str) -> Optional[str]:
+ """Get attribute of the first matching element"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.get_attribute(name)
+
+ async def inner_text(self) -> str:
+ """Get inner text of the first matching element"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.inner_text()
+
+ async def inner_html(self) -> str:
+ """Get inner HTML of the first matching element"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.inner_html()
+
+ async def is_visible(self) -> bool:
+ """Check if the first matching element is visible"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.is_visible()
+
+ async def is_enabled(self) -> bool:
+ """Check if the first matching element is enabled"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.is_enabled()
+
+ async def is_checked(self) -> bool:
+ """Check if the first matching element is checked"""
+ element = ElementHandle(self.page, self.selector)
+ return await element.is_checked()
+
+ async def check(self):
+ """Check the first matching checkbox or radio"""
+ return await self.page.check(self.selector)
+
+ async def uncheck(self):
+ """Uncheck the first matching checkbox"""
+ return await self.page.uncheck(self.selector)
+
+ async def select_option(self, values: Union[str, List[str]]):
+ """Select option(s) in the first matching select"""
+ return await self.page.select_option(self.selector, values)
+
+ async def count(self) -> int:
+ """Count matching elements"""
+ result = await self.page._playwright_action("query_selector_all", {"selector": self.selector})
+ return result.get("count", 0) if result.get("success") else 0
+
+ async def first(self) -> ElementHandle:
+ """Get the first matching element"""
+ return ElementHandle(self.page, self.selector)
+
+ async def wait_for(self, **options):
+ """Wait for the element to appear"""
+ return await self.page.wait_for_selector(self.selector, **options)
+
+class Page:
+ """Represents a browser page with Playwright-compatible API"""
+
+ def __init__(self, browser: 'Browser', connection_id: str):
+ self.browser = browser
+ self.connection_id = connection_id
+ self._closed = False
+ self._event_listeners: Dict[str, List[Callable]] = {}
- async def connect(self):
- """Connect to the browser server."""
- self.websocket = await websockets.connect(self.uri)
+ async def goto(self, url: str, **options) -> Dict[str, Any]:
+ """Navigate to a URL"""
+ return await self.browser._send_command({
+ "command": "goto",
+ "url": url,
+ "options": options
+ })
+
+ async def go_back(self, **options) -> Dict[str, Any]:
+ """Navigate back"""
+ return await self.browser._send_command({"command": "go_back"})
+
+ async def go_forward(self, **options) -> Dict[str, Any]:
+ """Navigate forward"""
+ return await self.browser._send_command({"command": "go_forward"})
+
+ async def reload(self, **options) -> Dict[str, Any]:
+ """Reload the page"""
+ return await self.browser._send_command({"command": "reload"})
+
+ async def set_content(self, html: str, **options) -> Dict[str, Any]:
+ """Set page content"""
+ return await self.browser._send_command({
+ "command": "set_content",
+ "html": html,
+ "options": options
+ })
+
+ async def content(self) -> str:
+ """Get page content"""
+ result = await self.browser._send_command({"command": "content"})
+ return result if isinstance(result, str) else ""
+
+ async def title(self) -> str:
+ """Get page title"""
+ result = await self.browser._send_command({"command": "title"})
+ return result if isinstance(result, str) else ""
+
+ async def url(self) -> str:
+ """Get page URL"""
+ result = await self.browser._send_command({"command": "url"})
+ return result if isinstance(result, str) else ""
+
+ async def evaluate(self, expression: str, arg=None) -> Any:
+ """Evaluate JavaScript in the page"""
+ if arg is not None:
+ # Simple argument serialization
+ expression = f"({expression})({json.dumps(arg)})"
+ return await self.browser._send_command({
+ "command": "evaluate",
+ "expression": expression
+ })
+
+ async def evaluate_handle(self, expression: str, arg=None) -> Any:
+ """Evaluate JavaScript and return a handle (simplified)"""
+ return await self.evaluate(expression, arg)
+
+ async def screenshot(self, **options) -> bytes:
+ """Take a screenshot"""
+ result = await self.browser._send_command({
+ "command": "screenshot",
+ "options": options
+ })
+ if isinstance(result, dict) and "screenshot" in result:
+ return base64.b64decode(result["screenshot"])
+ raise BrowserError("Failed to capture screenshot")
+
+ async def click(self, selector: str, **options) -> Dict[str, Any]:
+ """Click an element"""
+ return await self._playwright_action("click", {"selector": selector, "options": options})
+
+ async def dblclick(self, selector: str, **options) -> Dict[str, Any]:
+ """Double-click an element"""
+ # Simulate double click with two clicks
+ await self.click(selector, **options)
+ return await self.click(selector, **options)
+
+ async def fill(self, selector: str, value: str, **options) -> Dict[str, Any]:
+ """Fill an input element"""
+ return await self._playwright_action("fill", {"selector": selector, "value": value})
+
+ async def type(self, selector: str, text: str, delay: int = 0) -> Dict[str, Any]:
+ """Type text into an element"""
+ return await self.browser._send_command({
+ "command": "type",
+ "selector": selector,
+ "text": text,
+ "options": {"delay": delay}
+ })
+
+ async def press(self, selector: str, key: str, **options) -> Dict[str, Any]:
+ """Press a key"""
+ return await self._playwright_action("press", {"selector": selector, "key": key})
+
+ async def check(self, selector: str, **options) -> Dict[str, Any]:
+ """Check a checkbox or radio button"""
+ return await self._playwright_action("check", {"selector": selector})
+
+ async def uncheck(self, selector: str, **options) -> Dict[str, Any]:
+ """Uncheck a checkbox"""
+ return await self._playwright_action("uncheck", {"selector": selector})
+
+ async def select_option(self, selector: str, values: Union[str, List[str]], **options) -> Dict[str, Any]:
+ """Select option(s) in a select element"""
+ return await self._playwright_action("select_option", {"selector": selector, "values": values})
+
+ async def hover(self, selector: str, **options) -> Dict[str, Any]:
+ """Hover over an element"""
+ return await self._playwright_action("hover", {"selector": selector})
+
+ async def focus(self, selector: str, **options) -> Dict[str, Any]:
+ """Focus an element"""
+ return await self._playwright_action("focus", {"selector": selector})
+
+ async def wait_for_selector(self, selector: str, **options) -> Optional[ElementHandle]:
+ """Wait for a selector to appear"""
+ result = await self._playwright_action("wait_for_selector", {
+ "selector": selector,
+ "options": options
+ })
+ if result.get("success"):
+ return ElementHandle(self, selector)
+ return None
+
+ async def wait_for_load_state(self, state: str = "load", **options) -> None:
+ """Wait for a load state"""
+ await self._playwright_action("wait_for_load_state", {
+ "state": state,
+ "timeout": options.get("timeout", 30000)
+ })
+
+ async def wait_for_timeout(self, timeout: int) -> None:
+ """Wait for a timeout"""
+ await self._playwright_action("wait_for_timeout", {"timeout": timeout})
+
+ async def wait_for_function(self, expression: str, **options) -> Any:
+ """Wait for a function to return true"""
+ # Simplified implementation using polling
+ timeout = options.get("timeout", 30000)
+ polling = options.get("polling", 100)
+ start_time = time.time()
- # Get connection confirmation
- response = await self.websocket.recv()
- data = json.loads(response)
- self.connection_id = data.get("connection_id")
- print(f"Connected to browser: {self.connection_id}")
+ while (time.time() - start_time) * 1000 < timeout:
+ result = await self.evaluate(expression)
+ if result:
+ return result
+ await asyncio.sleep(polling / 1000)
- # Start response handler
- asyncio.create_task(self._response_handler())
+ raise TimeoutError(f"Timeout waiting for function: {expression}")
+
+ def locator(self, selector: str) -> Locator:
+ """Create a locator for the given selector"""
+ return Locator(self, selector)
+
+ async def query_selector(self, selector: str) -> Optional[ElementHandle]:
+ """Query for a single element"""
+ result = await self._playwright_action("query_selector", {"selector": selector})
+ if result.get("success") and result.get("found"):
+ return ElementHandle(self, selector)
+ return None
+
+ async def query_selector_all(self, selector: str) -> List[ElementHandle]:
+ """Query for all matching elements"""
+ result = await self._playwright_action("query_selector_all", {"selector": selector})
+ if result.get("success"):
+ count = result.get("count", 0)
+ # For simplicity, we return handles with indexed selectors
+ return [ElementHandle(self, f"{selector}:nth-of-type({i+1})") for i in range(count)]
+ return []
+
+ async def _playwright_action(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
+ """Execute a Playwright-style action"""
+ result = await self.browser._send_command({
+ "command": "playwright",
+ "action": action,
+ "params": params
+ })
- async def _response_handler(self):
- """Handle responses and events from the server."""
+ # Handle string results from JavaScript execution
+ if isinstance(result, str):
+ try:
+ return json.loads(result)
+ except json.JSONDecodeError:
+ return {"success": False, "error": f"Invalid response: {result}"}
+
+ return result or {}
+
+ def on(self, event: str, callback: Callable) -> None:
+ """Register an event listener"""
+ if event not in self._event_listeners:
+ self._event_listeners[event] = []
+ self._event_listeners[event].append(callback)
+
+ def once(self, event: str, callback: Callable) -> None:
+ """Register a one-time event listener"""
+ def wrapper(*args, **kwargs):
+ self.remove_listener(event, wrapper)
+ return callback(*args, **kwargs)
+ self.on(event, wrapper)
+
+ def remove_listener(self, event: str, callback: Callable) -> None:
+ """Remove an event listener"""
+ if event in self._event_listeners:
+ self._event_listeners[event] = [
+ cb for cb in self._event_listeners[event] if cb != callback
+ ]
+
+ def emit(self, event: str, data: Any) -> None:
+ """Emit an event to all listeners"""
+ if event in self._event_listeners:
+ for callback in self._event_listeners[event][:]:
+ try:
+ callback(data)
+ except Exception as e:
+ logger.error(f"Error in event listener for {event}: {e}")
+
+ async def close(self) -> None:
+ """Close the page"""
+ if not self._closed:
+ self._closed = True
+ await self.browser._send_command({"command": "close"})
+
+class BrowserContext:
+ """Browser context (simplified for single-page support)"""
+
+ def __init__(self, browser: 'Browser'):
+ self.browser = browser
+ self.pages: List[Page] = []
+
+ async def new_page(self) -> Page:
+ """Create a new page (currently limited to one per context)"""
+ if self.pages:
+ raise BrowserError("Multiple pages not supported in this implementation")
+
+ page = Page(self.browser, self.browser.connection_id)
+ self.pages.append(page)
+ return page
+
+ async def close(self) -> None:
+ """Close the context and all its pages"""
+ for page in self.pages[:]:
+ await page.close()
+ self.pages.clear()
+
+class Browser:
+ """Represents a browser instance with Playwright-compatible API"""
+
+ def __init__(self, ws_url: str = "ws://localhost:8765"):
+ self.ws_url = ws_url
+ self.websocket: Optional[websockets.WebSocketClientProtocol] = None
+ self.connection_id: Optional[str] = None
+ self._response_futures: Dict[str, asyncio.Future] = {}
+ self._event_listeners: Dict[str, List[Callable]] = {}
+ self._receive_task: Optional[asyncio.Task] = None
+ self._closed = False
+ self.contexts: List[BrowserContext] = []
+
+ async def connect(self) -> None:
+ """Connect to the browser server"""
+ self.websocket = await websockets.connect(self.ws_url)
+ self._receive_task = asyncio.create_task(self._receive_messages())
+
+ # Wait for connection confirmation
+ timeout = 15
+ start_time = time.time()
+ while not self.connection_id and time.time() - start_time < timeout:
+ await asyncio.sleep(0.1)
+
+ if not self.connection_id:
+ raise TimeoutError("Failed to establish browser connection")
+
+ async def _receive_messages(self) -> None:
+ """Receive messages from the WebSocket"""
try:
async for message in self.websocket:
- data = json.loads(message)
-
- if data["type"] == "response":
- request_id = data.get("request_id")
- if request_id in self.pending_responses:
- self.pending_responses[request_id].set_result(data)
-
- elif data["type"] == "event":
- print(f"Event: {data['event']} - {data['data']}")
+ try:
+ data = json.loads(message)
+ msg_type = data.get("type")
+ if msg_type == "connected":
+ self.connection_id = data.get("connection_id")
+ logger.info(f"Connected with ID: {self.connection_id}")
+
+ elif msg_type == "response":
+ request_id = data.get("request_id")
+ if request_id in self._response_futures:
+ future = self._response_futures.pop(request_id)
+ if data.get("error"):
+ future.set_exception(BrowserError(data["error"]))
+ else:
+ future.set_result(data.get("result"))
+
+ elif msg_type == "event":
+ event_type = data.get("event")
+ event_data = data.get("data", {})
+
+ # Emit to browser-level listeners
+ self._emit_event(event_type, event_data)
+
+ # Emit to page-level listeners if applicable
+ for context in self.contexts:
+ for page in context.pages:
+ page.emit(event_type, event_data)
+
+ except json.JSONDecodeError as e:
+ logger.error(f"Failed to parse message: {e}")
+ except Exception as e:
+ logger.error(f"Error handling message: {e}")
+
except websockets.exceptions.ConnectionClosed:
- print("Connection closed")
-
- async def _send_command(self, command: str, **kwargs) -> Dict[str, Any]:
- """Send a command and wait for response."""
- self.request_counter += 1
- request_id = f"req_{self.request_counter}"
+ logger.info("WebSocket connection closed")
+ except Exception as e:
+ logger.error(f"Error in receive loop: {e}")
+ finally:
+ # Clean up pending futures
+ for future in self._response_futures.values():
+ if not future.done():
+ future.set_exception(BrowserError("Connection closed"))
+ self._response_futures.clear()
+
+ async def _send_command(self, command: Dict[str, Any]) -> Any:
+ """Send a command and wait for response"""
+ if not self.websocket or self._closed:
+ raise BrowserError("WebSocket connection is closed")
+
+ request_id = str(uuid.uuid4())
+ command["request_id"] = request_id
# Create future for response
future = asyncio.Future()
- self.pending_responses[request_id] = future
+ self._response_futures[request_id] = future
- # Send command
- await self.websocket.send(json.dumps({
- "command": command,
- "request_id": request_id,
- **kwargs
- }))
-
- # Wait for response
try:
- response = await asyncio.wait_for(future, timeout=10.0)
- del self.pending_responses[request_id]
+ await self.websocket.send(json.dumps(command))
- if response.get("error"):
- raise Exception(response["error"])
-
- return response.get("result")
+ # Wait for response with timeout
+ timeout = command.get("timeout", 30)
+ return await asyncio.wait_for(future, timeout=timeout)
+
except asyncio.TimeoutError:
- del self.pending_responses[request_id]
- raise Exception("Command timeout")
-
- async def navigate(self, url: str) -> Dict[str, Any]:
- """Navigate to a URL."""
- return await self._send_command("navigate", url=url)
+ self._response_futures.pop(request_id, None)
+ raise TimeoutError(f"Command timeout: {command.get('command')}")
+ except Exception as e:
+ self._response_futures.pop(request_id, None)
+ raise BrowserError(f"Command failed: {e}")
+
+ def _emit_event(self, event: str, data: Any) -> None:
+ """Emit an event to all browser-level listeners"""
+ if event in self._event_listeners:
+ for callback in self._event_listeners[event][:]:
+ try:
+ callback(data)
+ except Exception as e:
+ logger.error(f"Error in browser event listener for {event}: {e}")
+
+ def on(self, event: str, callback: Callable) -> None:
+ """Register a browser-level event listener"""
+ if event not in self._event_listeners:
+ self._event_listeners[event] = []
+ self._event_listeners[event].append(callback)
+
+ def once(self, event: str, callback: Callable) -> None:
+ """Register a one-time browser-level event listener"""
+ def wrapper(*args, **kwargs):
+ self.remove_listener(event, wrapper)
+ return callback(*args, **kwargs)
+ self.on(event, wrapper)
+
+ def remove_listener(self, event: str, callback: Callable) -> None:
+ """Remove a browser-level event listener"""
+ if event in self._event_listeners:
+ self._event_listeners[event] = [
+ cb for cb in self._event_listeners[event] if cb != callback
+ ]
+
+ async def new_context(self, **options) -> BrowserContext:
+ """Create a new browser context"""
+ context = BrowserContext(self)
+ self.contexts.append(context)
+ return context
+
+ async def new_page(self) -> Page:
+ """Create a new page in the default context"""
+ if not self.contexts:
+ context = await self.new_context()
+ else:
+ context = self.contexts[0]
+ return await context.new_page()
+
+ async def close(self) -> None:
+ """Close the browser"""
+ if self._closed:
+ return
- async def execute_js(self, script: str) -> Any:
- """Execute JavaScript and return result."""
- return await self._send_command("execute_js", script=script)
+ self._closed = True
- async def go_back(self):
- """Go back in history."""
- return await self._send_command("go_back")
+ # Close all contexts
+ for context in self.contexts[:]:
+ await context.close()
- async def go_forward(self):
- """Go forward in history."""
- return await self._send_command("go_forward")
+ # Cancel receive task
+ if self._receive_task:
+ self._receive_task.cancel()
+ try:
+ await self._receive_task
+ except asyncio.CancelledError:
+ pass
- async def reload(self):
- """Reload the current page."""
- return await self._send_command("reload")
-
- async def stop(self):
- """Stop loading."""
- return await self._send_command("stop")
-
- async def get_info(self) -> Dict[str, Any]:
- """Get browser information."""
- return await self._send_command("get_info")
-
- async def screenshot(self, save_path: Optional[str] = None) -> str:
- """Take a screenshot. Returns base64 data or saves to file."""
- result = await self._send_command("screenshot")
- screenshot_b64 = result.get("screenshot")
-
- if save_path and screenshot_b64:
- with open(save_path, "wb") as f:
- f.write(base64.b64decode(screenshot_b64))
- return save_path
-
- return screenshot_b64
-
- async def set_html(self, html: str, base_uri: Optional[str] = None):
- """Load custom HTML content."""
- return await self._send_command("set_html", html=html, base_uri=base_uri)
-
- async def close(self):
- """Close the connection."""
+ # Close WebSocket
if self.websocket:
await self.websocket.close()
-
-
-# Example automation functions
-
-async def scrape_page_title(client: BrowserClient, url: str) -> str:
- """Example: Scrape page title."""
- await client.navigate(url)
- await asyncio.sleep(2) # Wait for page load
- title = await client.execute_js("document.title")
- return title
-
-async def fill_and_submit_form(client: BrowserClient, url: str):
- """Example: Fill and submit a form."""
- await client.navigate(url)
- await asyncio.sleep(2)
+
+ logger.info("Browser closed")
- # Fill form fields
- await client.execute_js("""
- document.querySelector('#username').value = 'testuser';
- document.querySelector('#email').value = 'test@example.com';
- """)
-
- # Submit form
- await client.execute_js("document.querySelector('#submit-button').click()")
-
-async def extract_all_links(client: BrowserClient, url: str) -> list:
- """Example: Extract all links from a page."""
- await client.navigate(url)
- await asyncio.sleep(2)
-
- links = await client.execute_js("""
- Array.from(document.querySelectorAll('a[href]')).map(a => ({
- text: a.textContent.trim(),
- href: a.href
- }))
- """)
- return links
+ @property
+ def is_connected(self) -> bool:
+ """Check if browser is connected"""
+ return bool(self.websocket and not self.websocket.closed and self.connection_id)
-async def monitor_page_changes(client: BrowserClient, url: str, selector: str, interval: int = 5):
- """Example: Monitor a page element for changes."""
- await client.navigate(url)
- await asyncio.sleep(2)
+class Playwright:
+ """Main Playwright-compatible API entry point"""
- last_value = None
- while True:
- try:
- current_value = await client.execute_js(f"document.querySelector('{selector}')?.textContent")
-
- if current_value != last_value:
- print(f"Change detected: {last_value} -> {current_value}")
- last_value = current_value
-
- await asyncio.sleep(interval)
- except Exception as e:
- print(f"Monitoring error: {e}")
- break
-
-
-# Main example
-async def main():
- """Example usage of the browser client."""
- client = BrowserClient()
+ def __init__(self):
+ self.browsers: List[Browser] = []
+ class chromium:
+ """Chromium browser launcher (uses WebKit in this implementation)"""
+
+ @staticmethod
+ async def launch(**options) -> Browser:
+ """Launch a browser instance"""
+ ws_url = options.get("ws_endpoint", "ws://localhost:8765")
+ browser = Browser(ws_url)
+ await browser.connect()
+ return browser
+
+ class firefox:
+ """Firefox browser launcher (uses WebKit in this implementation)"""
+
+ @staticmethod
+ async def launch(**options) -> Browser:
+ """Launch a browser instance"""
+ return await Playwright.chromium.launch(**options)
+
+ class webkit:
+ """WebKit browser launcher"""
+
+ @staticmethod
+ async def launch(**options) -> Browser:
+ """Launch a browser instance"""
+ return await Playwright.chromium.launch(**options)
+
+# Convenience function for async context manager
+@asynccontextmanager
+async def browser(**options):
+ """Async context manager for browser"""
+ playwright = Playwright()
+ browser_instance = await playwright.chromium.launch(**options)
try:
- # Connect to server
- await client.connect()
-
- # Example 1: Basic navigation and JS execution
- print("\n1. Basic navigation:")
- await client.navigate("https://www.example.com")
- await asyncio.sleep(2)
-
- title = await client.execute_js("document.title")
- print(f"Page title: {title}")
-
- # Example 2: Get page info
- print("\n2. Browser info:")
- info = await client.get_info()
- print(f"Current URL: {info['url']}")
- print(f"Can go back: {info['can_go_back']}")
-
- # Example 3: Custom HTML
- print("\n3. Loading custom HTML:")
- await client.set_html("""
-
-
Test Page
-
- WebSocket Browser Control
- This page was loaded via WebSocket!
-
-
-
- """)
-
- await asyncio.sleep(1)
- content = await client.execute_js("document.getElementById('content').textContent")
- print(f"Content: {content}")
-
- # Example 4: Screenshot
- print("\n4. Taking screenshot:")
- await client.screenshot("screenshot.png")
- print("Screenshot saved to screenshot.png")
-
- # Example 5: Extract links from a real page
- print("\n5. Extracting links:")
- links = await extract_all_links(client, "https://www.python.org")
- print(f"Found {len(links)} links")
- for link in links[:5]: # Show first 5
- print(f" - {link['text']}: {link['href']}")
-
- # Keep connection open for a bit to see any events
- print("\nWaiting for events...")
- await asyncio.sleep(5)
-
+ yield browser_instance
finally:
- await client.close()
+ await browser_instance.close()
+# Convenience function for creating a browser and page
+async def launch(**options) -> Browser:
+ """Launch a browser instance"""
+ playwright = Playwright()
+ return await playwright.chromium.launch(**options)
-if __name__ == "__main__":
- asyncio.run(main())
+# Example usage functions for compatibility
+async def goto(page: Page, url: str, **options):
+ """Navigate to URL (convenience function)"""
+ return await page.goto(url, **options)
+
+async def screenshot(page: Page, path: str = None, **options):
+ """Take screenshot (convenience function)"""
+ data = await page.screenshot(**options)
+ if path:
+ with open(path, 'wb') as f:
+ f.write(data)
+ return data
+
+async def evaluate(page: Page, expression: str, arg=None):
+ """Evaluate JavaScript (convenience function)"""
+ return await page.evaluate(expression, arg)
+
+# Selector engine helpers
+def css(selector: str) -> str:
+ """CSS selector helper"""
+ return selector
+
+def xpath(selector: str) -> str:
+ """XPath selector helper (converted to CSS where possible)"""
+ # Simple XPath to CSS conversion for common cases
+ if selector.startswith("//"):
+ return selector # Return as-is, will need special handling
+ return selector
+
+def text(text_content: str) -> str:
+ """Text selector helper"""
+ return f"//*[contains(text(), '{text_content}')]"
diff --git a/demo.py b/demo.py
index ca37120..a7df8e2 100644
--- a/demo.py
+++ b/demo.py
@@ -1,232 +1,679 @@
-import asyncio
-import websockets
-import json
+#!/usr/bin/env python3
+"""
+Comprehensive Demo of Playwright-style WebSocket Browser Control
+Shows both backward-compatible and new Playwright-style APIs.
+"""
-async def create_browser_window(window_id: int, url: str):
- """Create and control a single browser window."""
- uri = "ws://localhost:8765"
+import asyncio
+import logging
+import sys
+import os
+from datetime import datetime
+from client import Browser, Playwright, browser
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+async def demo_basic_navigation():
+ """Demo basic navigation features"""
+ print("\n=== Basic Navigation Demo ===")
- async with websockets.connect(uri) as websocket:
-
- response = await websocket.recv()
- conn_data = json.loads(response)
- print(f"Window {window_id} connected: {conn_data['connection_id'][:8]}")
+ # Create browser instance
+ browser_instance = Browser("ws://localhost:8765")
+ await browser_instance.connect()
+
+ try:
+ # Create a page
+ page = await browser_instance.new_page()
-
- await websocket.send(json.dumps({
- "command": "navigate",
- "url": url,
- "request_id": f"nav_{window_id}"
- }))
-
-
- await websocket.recv()
+ # Navigate to a website
+ print("Navigating to example.com...")
+ await page.goto("https://example.com")
await asyncio.sleep(2)
-
- await websocket.send(json.dumps({
- "command": "execute_js",
- "script": f"document.body.style.backgroundColor = '#{window_id:02x}0000'; document.title",
- "request_id": f"js_{window_id}"
- }))
+ # Get page info
+ title = await page.title()
+ url = await page.url()
+ print(f"Title: {title}")
+ print(f"URL: {url}")
- response = await websocket.recv()
- data = json.loads(response)
- print(f"Window {window_id} - Title: {data.get('result')}")
+ # Navigate to another page
+ print("\nNavigating to Python.org...")
+ await page.goto("https://www.python.org")
+ await asyncio.sleep(2)
-
- await asyncio.sleep(10)
+ # Go back
+ print("Going back...")
+ await page.go_back()
+ await asyncio.sleep(2)
- print(f"Window {window_id} closing...")
-
-
-async def parallel_browser_demo():
- """Demo: Open multiple browser windows in parallel."""
- urls = [
- "https://www.python.org",
- "https://www.github.com",
- "https://www.example.com",
- "https://www.wikipedia.org"
- ]
-
-
- tasks = []
- for i, url in enumerate(urls):
- task = asyncio.create_task(create_browser_window(i + 1, url))
- tasks.append(task)
- await asyncio.sleep(0.5)
-
-
- await asyncio.gather(*tasks)
-
- print("All browser windows closed.")
-
-
-async def automated_testing_demo():
- """Demo: Automated testing across multiple sites."""
- test_sites = [
- {"url": "https://www.example.com", "selector": "h1"},
- {"url": "https://www.python.org", "selector": ".introduction h1"},
- {"url": "https://httpbin.org/html", "selector": "h1"},
- ]
-
- async def test_site(site_info):
- uri = "ws://localhost:8765"
+ # Go forward
+ print("Going forward...")
+ await page.go_forward()
+ await asyncio.sleep(2)
- async with websockets.connect(uri) as websocket:
-
- await websocket.recv()
-
-
- await websocket.send(json.dumps({
- "command": "navigate",
- "url": site_info["url"]
- }))
- await websocket.recv()
- await asyncio.sleep(3)
-
-
- await websocket.send(json.dumps({
- "command": "execute_js",
- "script": f"document.querySelector('{site_info['selector']}')?.textContent || 'Not found'"
- }))
-
- response = await websocket.recv()
- data = json.loads(response)
- heading = data.get("result", "Error")
-
-
- await websocket.send(json.dumps({
- "command": "screenshot"
- }))
-
- screenshot_response = await websocket.recv()
- screenshot_data = json.loads(screenshot_response)
-
- print(f"Site: {site_info['url']}")
- print(f" Heading: {heading}")
- print(f" Screenshot: {'✓' if screenshot_data.get('result') else '✗'}")
- print()
-
-
- tasks = [test_site(site) for site in test_sites]
- await asyncio.gather(*tasks)
-
-
-async def form_automation_demo():
- """Demo: Fill forms in multiple windows."""
- uri = "ws://localhost:8765"
-
- async with websockets.connect(uri) as websocket:
-
- await websocket.recv()
+ # Reload
+ print("Reloading page...")
+ await page.reload()
+ await asyncio.sleep(2)
-
- html = """
+ finally:
+ await browser_instance.close()
+
+async def demo_playwright_style():
+ """Demo Playwright-style API usage"""
+ print("\n=== Playwright-Style API Demo ===")
+
+ # Using Playwright launcher
+ playwright = Playwright()
+ browser_instance = await playwright.chromium.launch(ws_endpoint="ws://localhost:8765")
+
+ try:
+ # Create context and page
+ context = await browser_instance.new_context()
+ page = await context.new_page()
+
+ # Navigate and wait for load
+ print("Navigating with Playwright-style API...")
+ await page.goto("https://www.wikipedia.org")
+ await page.wait_for_load_state("load")
+
+ # Use locators
+ print("Using locators to find elements...")
+ search_box = page.locator('input[name="search"]')
+
+ # Type into search box
+ print("Typing 'Python programming' into search...")
+ await search_box.type("Python programming", delay=50)
+ await asyncio.sleep(1)
+
+ # Press Enter
+ print("Pressing Enter...")
+ await search_box.press("Enter")
+ await page.wait_for_load_state("load")
+ await asyncio.sleep(2)
+
+ # Take screenshot
+ print("Taking screenshot...")
+ screenshot_data = await page.screenshot()
+ with open("wikipedia_search.png", "wb") as f:
+ f.write(screenshot_data)
+ print("Screenshot saved as wikipedia_search.png")
+
+ finally:
+ await browser_instance.close()
+
+async def demo_form_interaction():
+ """Demo form interaction capabilities"""
+ print("\n=== Form Interaction Demo ===")
+
+ async with browser(ws_endpoint="ws://localhost:8765") as browser_instance:
+ page = await browser_instance.new_page()
+
+ # Create a test form
+ html_content = """
+
- Form Automation Demo
+ Test Form
- Automated Form Demo
-
-
+
+
"""
- await websocket.send(json.dumps({
- "command": "set_html",
- "html": html
- }))
- await websocket.recv()
+ # Set the content
+ print("Loading test form...")
+ await page.set_content(html_content)
- print("Form loaded. Automating form filling...")
+ # Fill the form using different methods
+ print("\nFilling form fields...")
+
+ # Method 1: Using fill()
+ await page.fill("#name", "John Doe")
+
+ # Method 2: Using type() for more realistic typing
+ await page.type("#email", "john.doe@example.com", delay=50)
+
+ # Method 3: Using locator
+ country_select = page.locator("#country")
+ await country_select.select_option("us")
+
+ # Check checkbox
+ print("Checking newsletter checkbox...")
+ await page.check("#newsletter")
+
+ # Select radio button
+ print("Selecting gender radio button...")
+ await page.click('input[name="gender"][value="male"]')
+
+ # Fill textarea
+ print("Adding comments...")
+ comments_field = page.locator("#comments")
+ await comments_field.fill("This is a test comment\nWith multiple lines\nUsing Playwright-style API")
+
+ # Submit form
+ print("Submitting form...")
+ await page.click('button[type="submit"]')
await asyncio.sleep(1)
-
- fields = [
- ("document.getElementById('name').value = 'John Doe'", "Filled name"),
- ("document.getElementById('email').value = 'john@example.com'", "Filled email"),
- ("document.getElementById('country').value = 'US'", "Selected country"),
- ("submitForm()", "Submitted form")
- ]
+ # Check if result is visible
+ result_div = page.locator("#result")
+ is_visible = await result_div.is_visible()
+ print(f"Result visible: {is_visible}")
- for script, message in fields:
- await websocket.send(json.dumps({
- "command": "execute_js",
- "script": script
- }))
- await websocket.recv()
- print(f" ✓ {message}")
- await asyncio.sleep(1)
+ # Get the result content
+ result_text = await page.inner_text("#resultContent")
+ print("Form submission result:")
+ print(result_text)
+ # Take a screenshot of the filled form
+ await page.screenshot()
+ print("Screenshot taken of submitted form")
+
+async def demo_element_queries():
+ """Demo element querying and manipulation"""
+ print("\n=== Element Query Demo ===")
+
+ browser_instance = Browser("ws://localhost:8765")
+ await browser_instance.connect()
+
+ try:
+ page = await browser_instance.new_page()
+
+ # Create a test page with multiple elements
+ html_content = """
+
+
+
+ Element Query Test
+
+
+
+ Element Query Test
+
+
Item 1
+
Item 2
+
Item 3 (Hidden)
+
Item 4
+
+
+
+
+
+
+ """
+
+ await page.set_content(html_content)
+ print("Test page loaded")
+
+ # Query single element
+ print("\nQuerying single element...")
+ first_item = await page.query_selector(".item")
+ if first_item:
+ text = await first_item.inner_text()
+ print(f"First item text: {text}")
+
+ # Query all elements
+ print("\nQuerying all items...")
+ all_items = await page.query_selector_all(".item")
+ print(f"Found {len(all_items)} items")
+
+ # Check visibility
+ print("\nChecking visibility of items...")
+ for i, item in enumerate(all_items):
+ is_visible = await item.is_visible()
+ print(f"Item {i+1} visible: {is_visible}")
+
+ # Get attributes
+ print("\nGetting data attributes...")
+ for item in all_items:
+ data_id = await item.get_attribute("data-id")
+ print(f"Item has data-id: {data_id}")
+
+ # Click toggle button to show hidden item
+ print("\nClicking toggle button...")
+ await page.click("#toggleBtn")
+ await asyncio.sleep(0.5)
+
+ # Check visibility again
+ hidden_item = page.locator('[data-id="3"]')
+ is_visible_after = await hidden_item.is_visible()
+ print(f"Hidden item visible after toggle: {is_visible_after}")
+
+ # Click highlight button
+ print("\nClicking highlight button...")
+ await page.click("#highlightBtn")
+ await asyncio.sleep(0.5)
+
+ # Use evaluate to check if highlighting worked
+ has_highlight = await page.evaluate("""
+ () => {
+ const items = document.querySelectorAll('.item.highlight');
+ return items.length;
+ }
+ """)
+ print(f"Number of highlighted items: {has_highlight}")
+
+ finally:
+ await browser_instance.close()
+
+async def demo_wait_conditions():
+ """Demo various wait conditions"""
+ print("\n=== Wait Conditions Demo ===")
+
+ async with browser(ws_endpoint="ws://localhost:8765") as browser_instance:
+ page = await browser_instance.new_page()
+
+ # Create a dynamic page
+ html_content = """
+
+
+
+ Dynamic Content Test
+
+
+
+ Dynamic Content Test
+
+ Content will appear here...
+
+
+
+
+ """
+
+ await page.set_content(html_content)
+ print("Dynamic page loaded")
+
+ # Click load button
+ print("\nClicking load button...")
+ await page.click("#loadBtn")
+
+ # Wait for content to change
+ print("Waiting for content to load...")
+ await page.wait_for_function("""
+ () => {
+ const content = document.getElementById('content');
+ return content && content.classList.contains('loaded');
+ }
+ """, timeout=5000)
+ print("Content loaded!")
+
+ # Wait for extra content
+ print("Waiting for extra content...")
+ extra_content = await page.wait_for_selector("#extraContent", timeout=5000)
+ if extra_content:
+ extra_text = await extra_content.inner_text()
+ print(f"Extra content appeared: {extra_text}")
+
+ # Wait with timeout
+ print("\nWaiting for 2 seconds...")
+ await page.wait_for_timeout(2000)
+ print("Wait completed")
+
+async def demo_javascript_execution():
+ """Demo JavaScript execution capabilities"""
+ print("\n=== JavaScript Execution Demo ===")
+
+ browser_instance = Browser("ws://localhost:8765")
+ await browser_instance.connect()
+
+ try:
+ page = await browser_instance.new_page()
+
+ # Navigate to a simple page
+ await page.goto("https://example.com")
+ await asyncio.sleep(2) # Wait for page to load
+
+ # Execute simple JavaScript
+ print("\nExecuting simple JavaScript...")
+ try:
+ result = await page.evaluate("1 + 2")
+ print(f"1 + 2 = {result}")
+ except Exception as e:
+ print(f"Error executing simple math: {e}")
+
+ # Get page dimensions
+ try:
+ dimensions = await page.evaluate("""
+ () => {
+ return {
+ width: window.innerWidth,
+ height: window.innerHeight,
+ devicePixelRatio: window.devicePixelRatio || 1
+ }
+ }
+ """)
+ print(f"Page dimensions: {dimensions}")
+ except Exception as e:
+ print(f"Error getting dimensions: {e}")
+
+ # Modify page content
+ print("\nModifying page content...")
+ try:
+ await page.evaluate("""
+ () => {
+ const h1 = document.querySelector('h1');
+ if (h1) {
+ h1.style.color = 'red';
+ h1.textContent = 'Modified by Playwright-style API!';
+ }
+ }
+ """)
+ print("Page modified successfully")
+ except Exception as e:
+ print(f"Error modifying page: {e}")
+
+ # Create new elements
+ try:
+ await page.evaluate("""
+ () => {
+ const div = document.createElement('div');
+ div.id = 'custom-div';
+ div.style.cssText = 'position: fixed; top: 10px; right: 10px; ' +
+ 'background: yellow; padding: 20px; ' +
+ 'border: 2px solid black; z-index: 9999;';
+ div.textContent = 'Created via JavaScript!';
+ document.body.appendChild(div);
+ }
+ """)
+ print("New element created")
+ except Exception as e:
+ print(f"Error creating element: {e}")
+
+ await asyncio.sleep(2)
+
+ # Pass arguments to JavaScript
+ print("\nPassing arguments to JavaScript...")
+ try:
+ greeting = await page.evaluate(
+ "(name) => `Hello, ${name}!`",
+ "Playwright User"
+ )
+ print(f"Greeting: {greeting}")
+ except Exception as e:
+ print(f"Error with argument passing: {e}")
+
+ except Exception as e:
+ print(f"Error in demo: {e}")
+ finally:
+ await browser_instance.close()
+
+async def demo_event_handling():
+ """Demo event handling capabilities"""
+ print("\n=== Event Handling Demo ===")
+
+ browser_instance = Browser("ws://localhost:8765")
+ await browser_instance.connect()
+
+ # Set up event listeners
+ events_received = []
+
+ def on_browser_ready(data):
+ events_received.append(("browser_ready", data))
+ print(f"Event: Browser ready - {data}")
+
+ def on_load_started(data):
+ events_received.append(("load_started", data))
+ print(f"Event: Load started - {data.get('url', 'unknown')}")
+
+ def on_load_finished(data):
+ events_received.append(("load_finished", data))
+ print(f"Event: Load finished - {data.get('url', 'unknown')}")
+
+ def on_title_changed(data):
+ events_received.append(("title_changed", data))
+ print(f"Event: Title changed - {data.get('title', 'unknown')}")
+
+ # Register event listeners
+ browser_instance.on("browser_ready", on_browser_ready)
+ browser_instance.on("load_started", on_load_started)
+ browser_instance.on("load_finished", on_load_finished)
+ browser_instance.on("title_changed", on_title_changed)
+
+ try:
+ page = await browser_instance.new_page()
+
+ print("\nNavigating to trigger events...")
+ await page.goto("https://www.python.org")
+ await asyncio.sleep(2)
+
+ print(f"\nTotal events received: {len(events_received)}")
+
+ # Navigate to another page
+ print("\nNavigating to another page...")
+ await page.goto("https://example.com")
+ await asyncio.sleep(2)
+
+ print(f"Total events received: {len(events_received)}")
+
+ finally:
+ await browser_instance.close()
+
+async def demo_backward_compatibility():
+ """Demo backward compatible API usage"""
+ print("\n=== Backward Compatibility Demo ===")
+
+ browser_instance = Browser("ws://localhost:8765")
+ await browser_instance.connect()
+
+ try:
+ page = await browser_instance.new_page()
+
+ # Old-style commands still work
+ print("Using backward-compatible commands...")
+
+ # navigate command
+ result = await browser_instance._send_command({
+ "command": "navigate",
+ "url": "https://example.com"
+ })
+ print(f"Navigate result: {result}")
+ await asyncio.sleep(2)
+
+ # execute_js command
+ result = await browser_instance._send_command({
"command": "execute_js",
- "script": "document.getElementById('result').textContent"
- }))
+ "script": "document.title"
+ })
+ print(f"Page title via execute_js: {result}")
- response = await websocket.recv()
- data = json.loads(response)
- print(f"\nForm result: {data.get('result')}")
+ # simulate_typing command
+ result = await browser_instance._send_command({
+ "command": "simulate_typing",
+ "selector": "h1",
+ "text": "Hello",
+ "delay": 0.1
+ })
+ print(f"Simulate typing result: {result}")
- await asyncio.sleep(5)
+ # get_info command
+ info = await browser_instance._send_command({
+ "command": "get_info"
+ })
+ print(f"Browser info: {info}")
+
+ finally:
+ await browser_instance.close()
+async def run_all_demos():
+ """Run all demo functions"""
+ demos = [
+ demo_basic_navigation,
+ demo_playwright_style,
+ demo_form_interaction,
+ demo_element_queries,
+ demo_wait_conditions,
+ demo_javascript_execution,
+ demo_event_handling,
+ demo_backward_compatibility
+ ]
+
+ print("=" * 60)
+ print("Playwright-style WebSocket Browser Control Demo")
+ print("=" * 60)
+ print("\nMake sure the browser server is running on ws://localhost:8765")
+ print("Start it with: python browser_server_playwright.py")
+
+ input("\nPress Enter to start demos...")
+
+ for demo in demos:
+ try:
+ await demo()
+ await asyncio.sleep(2) # Pause between demos
+ except Exception as e:
+ logger.error(f"Error in {demo.__name__}: {e}")
+ import traceback
+ traceback.print_exc()
+
+ print("\n" + "=" * 60)
+ print("All demos completed!")
+ print("=" * 60)
-
+# Individual demo runners for testing
async def main():
- print("WebSocket Browser Control Demos")
- print("=" * 40)
- print("1. Parallel Browsers - Open 4 sites simultaneously")
- print("2. Automated Testing - Test multiple sites")
- print("3. Form Automation - Fill and submit forms")
- print("4. Run All Demos")
-
- choice = input("\nSelect demo (1-4): ")
-
- if choice == "1":
- await parallel_browser_demo()
- elif choice == "2":
- await automated_testing_demo()
- elif choice == "3":
- await form_automation_demo()
- elif choice == "4":
- print("\n--- Running Parallel Browsers Demo ---")
- await parallel_browser_demo()
- print("\n--- Running Automated Testing Demo ---")
- await automated_testing_demo()
- print("\n--- Running Form Automation Demo ---")
- await form_automation_demo()
+ """Main entry point"""
+ if len(sys.argv) > 1:
+ demo_name = sys.argv[1]
+ demos = {
+ "navigation": demo_basic_navigation,
+ "playwright": demo_playwright_style,
+ "form": demo_form_interaction,
+ "elements": demo_element_queries,
+ "wait": demo_wait_conditions,
+ "javascript": demo_javascript_execution,
+ "events": demo_event_handling,
+ "compatibility": demo_backward_compatibility,
+ "all": run_all_demos
+ }
+
+ if demo_name in demos:
+ await demos[demo_name]()
+ else:
+ print(f"Unknown demo: {demo_name}")
+ print(f"Available demos: {', '.join(demos.keys())}")
else:
- print("Invalid choice")
-
+ await run_all_demos()
if __name__ == "__main__":
- asyncio.run(main())
-
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ print("\nDemo interrupted by user")
+ except Exception as e:
+ logger.error(f"Fatal error: {e}")
+ import traceback
+ traceback.print_exc()
diff --git a/server.py b/server.py
index 7ee70e9..e30b6b8 100644
--- a/server.py
+++ b/server.py
@@ -1,7 +1,25 @@
+#!/usr/bin/env python3
+"""
+Production-ready WebSocket Browser Server with Playwright-compatible API
+Manages multiple WebKit2GTK browser instances with improved concurrency.
+
+This server is designed to work with various WebKit2 versions and automatically
+adapts to the available API. It supports both the original command API and
+the new Playwright-compatible API for maximum flexibility.
+
+WebKit2 Version Compatibility:
+- WebKit2 4.1: Full support with newer evaluate_javascript API
+- WebKit2 4.0: Full support with run_javascript API
+- WebKit2 3.0: Basic support with fallback mechanisms
+
+The server automatically detects the available WebKit2 version and uses
+appropriate methods for JavaScript execution and result extraction.
+"""
+
import gi
gi.require_version('Gtk', '3.0')
-
+# Attempt to load WebKit2 with version fallback
webkit_loaded = False
webkit_version = None
for version in ['4.1', '4.0', '3.0']:
@@ -22,464 +40,1878 @@ if not webkit_loaded:
exit(1)
from gi.repository import Gtk, WebKit2, GLib, Gdk
-print(f"Successfully loaded WebKit2 version {webkit_version}")
-
import asyncio
import websockets
import json
import threading
import uuid
-from typing import Dict, Optional, Callable, Any
import time
import base64
+import logging
+from typing import Dict, Optional, Any, List
+from concurrent.futures import ThreadPoolExecutor
+from dataclasses import dataclass
+import traceback
+import weakref
+import re
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+print(f"Successfully loaded WebKit2 version {webkit_version}")
+
+@dataclass
+class PendingCallback:
+ """Container for pending JavaScript callbacks"""
+ request_id: str
+ timestamp: float
+ timeout: float = 30.0
+
+ @property
+ def is_expired(self) -> bool:
+ return time.time() - self.timestamp > self.timeout
class RemoteBrowser:
"""
A WebKit2GTK browser instance that can be controlled via WebSocket.
- Each connection gets its own browser window.
+ Each connection gets its own browser window with improved concurrency.
"""
- def __init__(self, connection_id: str, websocket, server):
+ def __init__(self, connection_id: str, websocket, server, loop: asyncio.AbstractEventLoop):
self.connection_id = connection_id
self.websocket = websocket
self.server = server
+ self.loop = loop # The asyncio event loop from the main thread
self.window = None
self.webview = None
- self.pending_callbacks = {}
+ self.pending_callbacks: Dict[str, PendingCallback] = {}
self.is_closing = False
+ self.is_ready = False
+ self._lock = asyncio.Lock()
+ self._cleanup_task = None
+ self._gtk_lock = threading.Lock()
+ self._browser_created = threading.Event()
+ self._creation_error = None
+ async def start(self):
+ """Start the browser and cleanup task"""
+ try:
+ # Start cleanup task for expired callbacks
+ self._cleanup_task = asyncio.create_task(self._cleanup_expired_callbacks())
+ except Exception as e:
+ logger.error(f"Error starting browser tasks: {e}")
+ raise
+
+ async def _cleanup_expired_callbacks(self):
+ """Periodically clean up expired callbacks"""
+ while not self.is_closing:
+ try:
+ await asyncio.sleep(5) # Check every 5 seconds
+
+ async with self._lock:
+ expired_ids = [
+ callback_id for callback_id, callback in self.pending_callbacks.items()
+ if callback.is_expired
+ ]
+
+ for callback_id in expired_ids:
+ try:
+ async with self._lock:
+ callback = self.pending_callbacks.pop(callback_id, None)
+ if callback:
+ logger.warning(f"Command timed out for request_id: {callback.request_id}")
+ await self.send_response(
+ callback.request_id,
+ None,
+ f"Command for request_id {callback.request_id} timed out"
+ )
+ except Exception as e:
+ logger.error(f"Error handling expired callback {callback_id}: {e}")
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error(f"Error in cleanup task: {e}")
+
def create_browser(self):
"""Create the browser window in the GTK thread."""
- self.window = Gtk.Window(title=f"Remote Browser - {self.connection_id[:8]}")
- self.window.set_default_size(1024, 768)
- self.window.connect("destroy", self.on_window_destroy)
-
-
- main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
- self.window.add(main_box)
-
-
- nav_bar = self.create_navigation_bar()
- main_box.pack_start(nav_bar, False, False, 0)
-
-
- self.webview = WebKit2.WebView()
- self.setup_webview()
-
-
- scrolled_window = Gtk.ScrolledWindow()
- scrolled_window.add(self.webview)
- main_box.pack_start(scrolled_window, True, True, 0)
-
+ try:
+ with self._gtk_lock:
+ if self.is_closing:
+ return
- self.status_bar = Gtk.Statusbar()
- self.status_bar.push(0, f"Connected: {self.connection_id[:8]}")
- main_box.pack_start(self.status_bar, False, False, 0)
-
- self.window.show_all()
+ self.window = Gtk.Window(title=f"Remote Browser - {self.connection_id[:8]}")
+ self.window.set_default_size(1280, 720)
+ self.window.connect("destroy", self.on_window_destroy)
+
+ # Set window properties for stability
+ self.window.set_resizable(True)
+ self.window.set_deletable(True)
+
+ main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
+ self.window.add(main_box)
+
+ nav_bar = self.create_navigation_bar()
+ main_box.pack_start(nav_bar, False, False, 0)
+
+ self.webview = WebKit2.WebView()
+ if not self.setup_webview():
+ raise Exception("Failed to setup WebView")
+
+ scrolled_window = Gtk.ScrolledWindow()
+ scrolled_window.add(self.webview)
+ main_box.pack_start(scrolled_window, True, True, 0)
+
+ self.status_bar = Gtk.Statusbar()
+ self.status_context = self.status_bar.get_context_id("connection")
+ self.status_bar.push(self.status_context, f"Connected: {self.connection_id[:8]}")
+ main_box.pack_start(self.status_bar, False, False, 0)
+
+ self.window.show_all()
+ self.window.present()
+ self.is_ready = True
+ self._browser_created.set()
+
+ GLib.idle_add(self._notify_ready)
+
+ except Exception as e:
+ logger.error(f"Error creating browser window: {e}")
+ self._creation_error = str(e)
+ self._browser_created.set()
+ GLib.idle_add(self._notify_error, str(e))
+
+ def _notify_ready(self):
+ """Notify that browser is ready (called from GTK thread)."""
+ if not self.is_closing:
+ try:
+ coro = self.send_event("browser_ready", {"connection_id": self.connection_id})
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except:
+ pass
+
+ def _execute_wrapped_js(self, script: str, request_id: str):
+ """Execute JavaScript wrapped to always return a string for maximum compatibility."""
+ def js_finished(webview, task, user_data):
+ try:
+ # Try to get the result string directly
+ result_str = None
+
+ try:
+ # For very old WebKit2 versions, try the simple approach
+ result = webview.run_javascript_finish(task)
+ result_str = str(result)
+
+ # Check if the result looks like our wrapped response
+ if result_str and ('"success":true' in result_str or '"success":false' in result_str):
+ # Extract the JSON from the string representation
+ import re
+ json_match = re.search(r'\{.*\}', result_str)
+ if json_match:
+ result_str = json_match.group(0)
+ except Exception as e:
+ logger.debug(f"Failed to get result string: {e}")
+
+ if result_str:
+ try:
+ parsed = json.loads(result_str)
+ if parsed.get("success"):
+ value = parsed.get("value")
+ else:
+ value = None
+ error_msg = parsed.get("error", "Unknown error")
+ coro = self.send_response(request_id, value, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+ except json.JSONDecodeError:
+ value = result_str
+ else:
+ value = None
+
+ coro = self.send_response(request_id, value)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"Error in wrapped JS execution: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ try:
+ self.webview.run_javascript(script, None, js_finished, None)
+ except Exception as e:
+ error_msg = f"Failed to execute wrapped JavaScript: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ def _execute_wait_check(self, script: str, check_id: str, callback):
+ """Execute a simple check for wait operations."""
+ def js_finished(webview, task, user_data):
+ try:
+ result_str = None
+ try:
+ result = webview.run_javascript_finish(task)
+ # Extract the JSON string from the result
+ if hasattr(result, 'get_js_value'):
+ js_value = result.get_js_value()
+ if hasattr(js_value, 'to_string'):
+ result_str = js_value.to_string()
+ elif hasattr(result, 'get_value'):
+ result_str = result.get_value()
+ else:
+ result_str = str(result)
+ # Clean up string representation
+ import re
+ json_match = re.search(r'\{.*\}', result_str)
+ if json_match:
+ result_str = json_match.group(0)
+ except Exception as e:
+ logger.debug(f"Failed to get wait check result: {e}")
+
+ callback(result_str)
+ except Exception as e:
+ logger.error(f"Error in wait check: {e}")
+ callback(None)
+ try:
+ self.webview.run_javascript(script, None, js_finished, None)
+ except Exception as e:
+ logger.error(f"Failed to execute wait check: {e}")
+ callback(None)
+
+ def _notify_error(self, error_msg):
+ """Notify about browser creation error (called from GTK thread)."""
+ if not self.is_closing:
+ try:
+ coro = self.send_event("browser_error", {"error": error_msg})
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ logger.error(f"Error notifying browser error: {e}")
+
def create_navigation_bar(self):
"""Create a simple navigation bar."""
- nav_bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
- nav_bar.set_margin_top(5)
- nav_bar.set_margin_bottom(5)
- nav_bar.set_margin_left(5)
- nav_bar.set_margin_right(5)
+ try:
+ nav_bar = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
+ nav_bar.set_margin_top(5)
+ nav_bar.set_margin_bottom(5)
+ # Use newer margin methods instead of deprecated ones
+ nav_bar.set_margin_start(5)
+ nav_bar.set_margin_end(5)
+
+ self.url_label = Gtk.Label()
+ self.url_label.set_ellipsize(True)
+ self.url_label.set_max_width_chars(50)
+ nav_bar.pack_start(self.url_label, True, True, 0)
+
+ return nav_bar
+ except Exception as e:
+ logger.error(f"Error creating navigation bar: {e}")
+ # Return minimal nav bar on error
+ return Gtk.Box()
-
- self.url_label = Gtk.Label()
- self.url_label.set_ellipsize(True)
- self.url_label.set_max_width_chars(50)
- nav_bar.pack_start(self.url_label, True, True, 0)
-
- return nav_bar
-
- def setup_webview(self):
+ def setup_webview(self) -> bool:
"""Configure WebKit view settings."""
- settings = self.webview.get_settings()
- settings.set_enable_developer_extras(True)
- settings.set_enable_javascript(True)
- settings.set_allow_file_access_from_file_urls(True)
-
-
- self.webview.connect("load-changed", self.on_load_changed)
- self.webview.connect("notify::uri", self.on_uri_changed)
- self.webview.connect("load-failed", self.on_load_failed)
-
+ try:
+ if not self.webview:
+ return False
+
+ settings = self.webview.get_settings()
+ settings.set_enable_developer_extras(True)
+ settings.set_enable_javascript(True)
+ settings.set_allow_file_access_from_file_urls(True)
+ settings.set_enable_webgl(True)
+ settings.set_enable_media_stream(True)
+
+ # Only set non-deprecated security settings
+ try:
+ # These might be available in newer versions
+ if hasattr(settings, 'set_enable_java'):
+ settings.set_enable_java(False)
+ if hasattr(settings, 'set_enable_plugins'):
+ settings.set_enable_plugins(False)
+ except:
+ # Ignore if not available
+ pass
+
+ # Connect signals with weak references to avoid circular references
+ self.webview.connect("load-changed", self.on_load_changed)
+ self.webview.connect("notify::uri", self.on_uri_changed)
+ self.webview.connect("load-failed", self.on_load_failed)
+ self.webview.connect("notify::title", self.on_title_changed)
+
+ return True
+
+ except Exception as e:
+ logger.error(f"Error setting up webview: {e}")
+ return False
+
def on_window_destroy(self, widget):
"""Handle window close event."""
if not self.is_closing:
self.is_closing = True
- asyncio.create_task(self.close())
+ try:
+ asyncio.run_coroutine_threadsafe(self.close(), self.loop)
+ except Exception as e:
+ logger.error(f"Error scheduling close: {e}")
def on_load_changed(self, webview, load_event):
- """Handle page load events."""
- if load_event == WebKit2.LoadEvent.FINISHED:
- asyncio.create_task(self.send_event("load_finished", {
- "url": webview.get_uri(),
- "title": webview.get_title()
- }))
- elif load_event == WebKit2.LoadEvent.STARTED:
- asyncio.create_task(self.send_event("load_started", {
- "url": webview.get_uri()
- }))
+ """Handle page load events (called from GTK thread)."""
+ if self.is_closing:
+ return
+
+ try:
+ event_data = {}
+
+ # Safely get URI and title
+ try:
+ uri = webview.get_uri()
+ if uri:
+ event_data["url"] = uri
+ except:
+ pass
+
+ try:
+ title = webview.get_title()
+ if title:
+ event_data["title"] = title
+ except:
+ pass
+
+ if load_event == WebKit2.LoadEvent.FINISHED:
+ coro = self.send_event("load_finished", event_data)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ elif load_event == WebKit2.LoadEvent.STARTED:
+ coro = self.send_event("load_started", event_data)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ elif load_event == WebKit2.LoadEvent.COMMITTED:
+ coro = self.send_event("load_committed", event_data)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ logger.error(f"Error in load changed handler: {e}")
def on_uri_changed(self, webview, param):
"""Update URL label when URI changes."""
- uri = webview.get_uri()
- if uri:
- self.url_label.set_text(uri)
+ if self.is_closing:
+ return
+
+ try:
+ uri = webview.get_uri()
+ if uri and self.url_label:
+ self.url_label.set_text(uri)
+ except Exception as e:
+ logger.error(f"Error updating URI: {e}")
+
+ def on_title_changed(self, webview, param):
+ """Handle title changes (called from GTK thread)."""
+ if self.is_closing:
+ return
+
+ try:
+ title = webview.get_title()
+ uri = None
+ try:
+ uri = webview.get_uri()
+ except:
+ pass
+
+ if title:
+ event_data = {"title": title}
+ if uri:
+ event_data["url"] = uri
+ coro = self.send_event("title_changed", event_data)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ logger.error(f"Error in title changed handler: {e}")
def on_load_failed(self, webview, load_event, failing_uri, error):
- """Handle load failures."""
- asyncio.create_task(self.send_event("load_failed", {
- "url": failing_uri,
- "error": error.message if error else "Unknown error"
- }))
+ """Handle load failures (called from GTK thread)."""
+ if self.is_closing:
+ return
+
+ try:
+ error_msg = error.message if error else "Unknown error"
+ coro = self.send_event("load_failed", {"url": failing_uri, "error": error_msg})
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ logger.error(f"Error in load failed handler: {e}")
async def send_event(self, event_type: str, data: dict):
"""Send an event to the WebSocket client."""
+ if self.is_closing or not self.websocket:
+ return
+
try:
- await self.websocket.send(json.dumps({
- "type": "event",
- "event": event_type,
- "data": data
- }))
- except:
- pass
+ async with self._lock:
+ if self.websocket.state == websockets.protocol.State.OPEN:
+ await self.websocket.send(json.dumps({
+ "type": "event",
+ "event": event_type,
+ "data": data,
+ "timestamp": time.time()
+ }))
+ except websockets.exceptions.ConnectionClosed:
+ self.is_closing = True
+ except Exception as e:
+ logger.error(f"Error sending event {event_type}: {e}")
async def send_response(self, request_id: str, result: Any, error: Optional[str] = None):
"""Send a response to a command."""
+ if self.is_closing or not self.websocket:
+ return
+
try:
- await self.websocket.send(json.dumps({
- "type": "response",
- "request_id": request_id,
- "result": result,
- "error": error
- }))
- except:
- pass
+ async with self._lock:
+ if self.websocket.state == websockets.protocol.State.OPEN:
+ await self.websocket.send(json.dumps({
+ "type": "response",
+ "request_id": request_id,
+ "result": result,
+ "error": error,
+ "timestamp": time.time()
+ }))
+ except websockets.exceptions.ConnectionClosed:
+ self.is_closing = True
+ except Exception as e:
+ logger.error(f"Error sending response for {request_id}: {e}")
def execute_javascript(self, script: str, request_id: str):
"""Execute JavaScript and send result back via WebSocket."""
+ if not self.webview or self.is_closing:
+ coro = self.send_response(request_id, None, "Browser not ready")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
+ # Store callback info with lock
+ try:
+ with self._gtk_lock:
+ self.pending_callbacks[request_id] = PendingCallback(request_id, time.time())
+ except Exception as e:
+ logger.error(f"Error storing callback: {e}")
+ coro = self.send_response(request_id, None, str(e))
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
def js_finished(webview, task, user_data):
try:
- result = webview.run_javascript_finish(task)
- js_result = result.get_js_value()
-
-
- if js_result.is_string():
- value = js_result.to_string()
- elif js_result.is_number():
- value = js_result.to_double()
- elif js_result.is_boolean():
- value = js_result.to_boolean()
- elif js_result.is_object():
-
- self.webview.run_javascript(
- f"JSON.stringify({script})",
- None,
- lambda wv, t, ud: self._handle_json_result(wv, t, request_id)
- )
- return
- else:
- value = None
-
- asyncio.create_task(self.send_response(request_id, value))
+ with self._gtk_lock:
+ callback = self.pending_callbacks.pop(request_id, None)
+ if not callback:
+ return # Timed out or already handled
except Exception as e:
- asyncio.create_task(self.send_response(request_id, None, str(e)))
+ logger.error(f"Error retrieving callback: {e}")
+ return
+
+ try:
+ # Different WebKit2 versions have different APIs
+ result = None
+ value = None
+
+ # Try newer API first
+ if hasattr(webview, 'evaluate_javascript_finish'):
+ try:
+ result = webview.evaluate_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"evaluate_javascript_finish failed: {e}")
+ result = None
+
+ # Fallback to older API
+ if result is None and hasattr(webview, 'run_javascript_finish'):
+ try:
+ result = webview.run_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"run_javascript_finish failed: {e}")
+ result = None
+
+ if result is None:
+ # If both methods failed, try to return a simple result
+ # This might happen with very old WebKit2 versions
+ logger.info("Falling back to JSON string execution for compatibility")
+ # Wrap the script to always return a JSON string
+ wrapped_script = f"""
+ (function() {{
+ try {{
+ var result = ({script});
+ return JSON.stringify({{success: true, value: result}});
+ }} catch (e) {{
+ return JSON.stringify({{success: false, error: e.toString()}});
+ }}
+ }})()
+ """
+ self._execute_wrapped_js(wrapped_script, request_id)
+ return
+
+ # Try to extract the value from the result
+ # Different WebKit2 versions use different methods
+ if hasattr(result, 'get_js_value'):
+ js_result = result.get_js_value()
+ if hasattr(js_result, 'is_string') and js_result.is_string():
+ value = js_result.to_string()
+ elif hasattr(js_result, 'is_number') and js_result.is_number():
+ value = js_result.to_double()
+ elif hasattr(js_result, 'is_boolean') and js_result.is_boolean():
+ value = js_result.to_boolean()
+ elif hasattr(js_result, 'is_object') and (js_result.is_object() or js_result.is_array()):
+ # Handle JSON serialization
+ json_script = f"JSON.stringify(({script}))"
+ self._execute_simple_js(json_script, request_id)
+ return
+ else:
+ value = None
+ elif hasattr(result, 'get_value'):
+ # Older WebKit2 API
+ try:
+ value = result.get_value()
+ except:
+ # If get_value fails, try JSON serialization
+ json_script = f"JSON.stringify(({script}))"
+ self._execute_simple_js(json_script, request_id)
+ return
+ else:
+ # Last resort: try JSON serialization
+ json_script = f"JSON.stringify(({script}))"
+ self._execute_simple_js(json_script, request_id)
+ return
+
+ coro = self.send_response(request_id, value)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"JavaScript execution error: {str(e)}"
+ logger.error(f"{error_msg} - Script: {script[:100]}...")
+ # Try simple JSON serialization as fallback
+ try:
+ json_script = f"JSON.stringify(({script}))"
+ self._execute_simple_js(json_script, request_id)
+ except:
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
- self.webview.run_javascript(script, None, js_finished, None)
-
- def _handle_json_result(self, webview, task, request_id):
- """Handle JSON stringified JavaScript results."""
try:
- result = webview.run_javascript_finish(task)
- js_result = result.get_js_value()
- json_str = js_result.to_string()
- value = json.loads(json_str) if json_str else None
- asyncio.create_task(self.send_response(request_id, value))
+ # Use newer evaluate_javascript method if available
+ if hasattr(self.webview, 'evaluate_javascript'):
+ self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None)
+ else:
+ # Fallback to deprecated method
+ self.webview.run_javascript(script, None, js_finished, None)
except Exception as e:
- asyncio.create_task(self.send_response(request_id, None, str(e)))
+ with self._gtk_lock:
+ self.pending_callbacks.pop(request_id, None)
+ error_msg = f"Failed to execute JavaScript: {str(e)}"
+ logger.error(f"{error_msg} - Script: {script[:100]}...")
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ def _execute_simple_js(self, script: str, request_id: str):
+ """Execute simple JavaScript that returns a string result."""
+ def js_finished(webview, task, user_data):
+ try:
+ # Different WebKit2 versions have different APIs
+ result = None
+ json_str = None
+
+ # Try newer API first
+ if hasattr(webview, 'evaluate_javascript_finish'):
+ try:
+ result = webview.evaluate_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"evaluate_javascript_finish failed: {e}")
+ result = None
+
+ # Fallback to older API
+ if result is None and hasattr(webview, 'run_javascript_finish'):
+ try:
+ result = webview.run_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"run_javascript_finish failed: {e}")
+ result = None
+
+ if result is None:
+ error_msg = "Failed to get JavaScript result"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
+ # Try to extract the string value
+ if hasattr(result, 'get_js_value'):
+ js_result = result.get_js_value()
+ if hasattr(js_result, 'to_string'):
+ json_str = js_result.to_string()
+ else:
+ json_str = str(js_result)
+ elif hasattr(result, 'get_value'):
+ # Older WebKit2 API
+ try:
+ json_str = result.get_value()
+ except:
+ json_str = str(result)
+ else:
+ # Last resort
+ json_str = str(result)
+
+ try:
+ value = json.loads(json_str) if json_str else None
+ except json.JSONDecodeError:
+ value = json_str # Return as string if not valid JSON
+
+ coro = self.send_response(request_id, value)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"Error parsing JavaScript result: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ try:
+ # Use newer evaluate_javascript method if available
+ if hasattr(self.webview, 'evaluate_javascript'):
+ self.webview.evaluate_javascript(script, -1, None, None, None, js_finished, None)
+ else:
+ # Fallback to deprecated method
+ self.webview.run_javascript(script, None, js_finished, None)
+ except Exception as e:
+ error_msg = f"Failed to execute simple JavaScript: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ def get_inner_html(self, selector: str, request_id):
+ script = f"document.querySelector({json.dumps(selector)})?.innerHTML || null"
+ self.execute_javascript(script, request_id)
+
+
+ def simulate_typing(self, selector: str, text: str, request_id: str, delay: float = 0.1):
+ """Simulate typing into an element. This is now fully async in JS."""
+ if not self.webview or self.is_closing:
+ coro = self.send_response(request_id, None, "Browser not ready")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
+ # Validate inputs
+ if not selector:
+ coro = self.send_response(request_id, None, "Selector cannot be empty")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
+ # Sanitize delay
+ delay = max(0, min(delay, 5.0)) # Limit delay between 0 and 5 seconds
+
+ # Store callback with extended timeout
+ timeout = 30 + len(text) * delay
+ try:
+ with self._gtk_lock:
+ self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=timeout)
+ except Exception as e:
+ logger.error(f"Error storing typing callback: {e}")
+ coro = self.send_response(request_id, None, str(e))
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ return
+
+ # Create a self-contained async function that returns a stringified result
+ typing_script = f"""
+ (async function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found: {selector}' }});
+ }}
+
+ // Check if element is an input or textarea
+ if (!['INPUT', 'TEXTAREA'].includes(element.tagName) && !element.isContentEditable) {{
+ return JSON.stringify({{ success: false, error: 'Element is not editable' }});
+ }}
+
+ element.focus();
+
+ const text = {json.dumps(text)};
+ const delay = {delay * 1000};
+
+ // Clear existing value
+ if (element.value !== undefined) {{
+ element.value = '';
+ }}
+
+ for (const char of text) {{
+ if (element.value !== undefined) {{
+ element.value += char;
+ }} else if (element.isContentEditable) {{
+ element.textContent += char;
+ }}
+
+ element.dispatchEvent(new Event('input', {{ bubbles: true }}));
+ element.dispatchEvent(new KeyboardEvent('keydown', {{ key: char, bubbles: true }}));
+ element.dispatchEvent(new KeyboardEvent('keyup', {{ key: char, bubbles: true }}));
+
+ if (delay > 0) await new Promise(r => setTimeout(r, delay));
+ }}
+
+ // Trigger change event at the end
+ element.dispatchEvent(new Event('change', {{ bubbles: true }}));
+
+ return JSON.stringify({{ success: true, typed: text.length }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+
+ def typing_finished(webview, task, user_data):
+ try:
+ with self._gtk_lock:
+ callback = self.pending_callbacks.pop(request_id, None)
+ if not callback:
+ return
+ except Exception as e:
+ logger.error(f"Error retrieving typing callback: {e}")
+ return
+
+ try:
+ # Different WebKit2 versions have different APIs
+ result = None
+ json_str = None
+
+ # Try newer API first
+ if hasattr(webview, 'evaluate_javascript_finish'):
+ try:
+ result = webview.evaluate_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"evaluate_javascript_finish failed: {e}")
+ result = None
+
+ # Fallback to older API
+ if result is None and hasattr(webview, 'run_javascript_finish'):
+ try:
+ result = webview.run_javascript_finish(task)
+ except Exception as e:
+ logger.debug(f"run_javascript_finish failed: {e}")
+ result = None
+
+ if result is None:
+ value = {"success": False, "error": "Failed to get typing result"}
+ else:
+ # Try to extract the string value
+ if hasattr(result, 'get_js_value'):
+ js_result = result.get_js_value()
+ if hasattr(js_result, 'to_string'):
+ json_str = js_result.to_string()
+ else:
+ json_str = str(js_result)
+ elif hasattr(result, 'get_value'):
+ # Older WebKit2 API
+ try:
+ json_str = result.get_value()
+ except:
+ json_str = str(result)
+ else:
+ # Last resort
+ json_str = str(result)
+
+ try:
+ value = json.loads(json_str) if json_str else None
+ except json.JSONDecodeError:
+ value = {"success": False, "error": f"Invalid JSON response: {json_str}"}
+
+ coro = self.send_response(request_id, value)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"Error in typing finished callback: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ try:
+ # Execute the typing script
+ # Use newer evaluate_javascript method if available
+ if hasattr(self.webview, 'evaluate_javascript'):
+ self.webview.evaluate_javascript(typing_script, -1, None, None, None, typing_finished, None)
+ else:
+ # Fallback to deprecated method
+ self.webview.run_javascript(typing_script, None, typing_finished, None)
+ except Exception as e:
+ with self._gtk_lock:
+ self.pending_callbacks.pop(request_id, None)
+ error_msg = f"Failed to start typing simulation: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ def execute_playwright_action(self, action: str, params: dict, request_id: str):
+ """Execute a Playwright-style action."""
+ # For all Playwright actions, we use the more compatible JSON-wrapped approach
+ # This ensures the result is always a string that can be parsed
+
+ if action == "click":
+ selector = params.get("selector")
+ options = params.get("options", {})
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+
+ // Get element position
+ const rect = element.getBoundingClientRect();
+ const x = rect.left + rect.width / 2;
+ const y = rect.top + rect.height / 2;
+
+ // Dispatch mouse events
+ const mouseEventInit = {{
+ bubbles: true,
+ cancelable: true,
+ view: window,
+ clientX: x,
+ clientY: y
+ }};
+
+ element.dispatchEvent(new MouseEvent('mousedown', mouseEventInit));
+ element.dispatchEvent(new MouseEvent('mouseup', mouseEventInit));
+ element.dispatchEvent(new MouseEvent('click', mouseEventInit));
+
+ // Also trigger if it's a link or button
+ if (element.tagName === 'A' || element.tagName === 'BUTTON') {{
+ element.click();
+ }}
+
+ return JSON.stringify({{ success: true }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "fill":
+ selector = params.get("selector")
+ value = params.get("value", "")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+
+ element.focus();
+
+ if (element.value !== undefined) {{
+ element.value = {json.dumps(value)};
+ }} else if (element.isContentEditable) {{
+ element.textContent = {json.dumps(value)};
+ }} else {{
+ return JSON.stringify({{ success: false, error: 'Element is not fillable' }});
+ }}
+
+ element.dispatchEvent(new Event('input', {{ bubbles: true }}));
+ element.dispatchEvent(new Event('change', {{ bubbles: true }}));
+
+ return JSON.stringify({{ success: true }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "hover":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+
+ const rect = element.getBoundingClientRect();
+ const x = rect.left + rect.width / 2;
+ const y = rect.top + rect.height / 2;
+
+ const mouseEvent = new MouseEvent('mouseover', {{
+ bubbles: true,
+ cancelable: true,
+ view: window,
+ clientX: x,
+ clientY: y
+ }});
+
+ element.dispatchEvent(mouseEvent);
+ element.dispatchEvent(new MouseEvent('mouseenter', {{ bubbles: true }}));
+
+ return JSON.stringify({{ success: true }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "focus":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ element.focus();
+ return JSON.stringify({{ success: true }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "check":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ if (element.type !== 'checkbox' && element.type !== 'radio') {{
+ return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }});
+ }}
+ if (!element.checked) {{
+ element.checked = true;
+ element.dispatchEvent(new Event('change', {{ bubbles: true }}));
+ }}
+ return JSON.stringify({{ success: true, checked: element.checked }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "uncheck":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ if (element.type !== 'checkbox' && element.type !== 'radio') {{
+ return JSON.stringify({{ success: false, error: 'Element is not a checkbox or radio' }});
+ }}
+ if (element.checked) {{
+ element.checked = false;
+ element.dispatchEvent(new Event('change', {{ bubbles: true }}));
+ }}
+ return JSON.stringify({{ success: true, checked: element.checked }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "select_option":
+ selector = params.get("selector")
+ values = params.get("values", [])
+ if isinstance(values, str):
+ values = [values]
+ script = f"""
+ (function() {{
+ try {{
+ const select = document.querySelector({json.dumps(selector)});
+ if (!select) {{
+ return JSON.stringify({{ success: false, error: 'Select element not found' }});
+ }}
+ if (select.tagName !== 'SELECT') {{
+ return JSON.stringify({{ success: false, error: 'Element is not a select' }});
+ }}
+
+ const values = {json.dumps(values)};
+ const selected = [];
+
+ // Clear previous selections if not multiple
+ if (!select.multiple) {{
+ Array.from(select.options).forEach(opt => opt.selected = false);
+ }}
+
+ // Select new values
+ for (const value of values) {{
+ const option = Array.from(select.options).find(
+ opt => opt.value === value || opt.text === value
+ );
+ if (option) {{
+ option.selected = true;
+ selected.push(option.value);
+ }}
+ }}
+
+ select.dispatchEvent(new Event('change', {{ bubbles: true }}));
+
+ return JSON.stringify({{ success: true, selected: selected }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "press":
+ selector = params.get("selector")
+ key = params.get("key")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+
+ element.focus();
+
+ const key = {json.dumps(key)};
+ const keyEvent = new KeyboardEvent('keydown', {{
+ key: key,
+ code: key,
+ bubbles: true,
+ cancelable: true
+ }});
+
+ element.dispatchEvent(keyEvent);
+ element.dispatchEvent(new KeyboardEvent('keyup', {{
+ key: key,
+ code: key,
+ bubbles: true,
+ cancelable: true
+ }}));
+
+ // Handle special keys
+ if (key === 'Enter' && (element.tagName === 'INPUT' || element.tagName === 'TEXTAREA')) {{
+ const form = element.form;
+ if (form && element.tagName === 'INPUT') {{
+ form.dispatchEvent(new Event('submit', {{ bubbles: true, cancelable: true }}));
+ }}
+ }}
+
+ return JSON.stringify({{ success: true }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "wait_for_selector":
+ selector = params.get("selector")
+ options = params.get("options", {})
+ timeout = options.get("timeout", 30000)
+ state = options.get("state", "visible")
+
+ # Simplified synchronous check
+ script = f"""
+ (function() {{
+ const selector = {json.dumps(selector)};
+ const state = {json.dumps(state)};
+ const element = document.querySelector(selector);
+
+ if (state === 'attached') {{
+ return JSON.stringify({{ success: !!element, found: !!element }});
+ }} else if (state === 'detached') {{
+ return JSON.stringify({{ success: !element, found: false }});
+ }} else if (state === 'visible') {{
+ if (!element) {{
+ return JSON.stringify({{ success: false, found: false }});
+ }}
+ const style = window.getComputedStyle(element);
+ const isVisible = style.display !== 'none' &&
+ style.visibility !== 'hidden' &&
+ style.opacity !== '0';
+ return JSON.stringify({{ success: isVisible, found: isVisible }});
+ }} else if (state === 'hidden') {{
+ if (!element) {{
+ return JSON.stringify({{ success: true, found: false }});
+ }}
+ const style = window.getComputedStyle(element);
+ const isHidden = style.display === 'none' ||
+ style.visibility === 'hidden' ||
+ style.opacity === '0';
+ return JSON.stringify({{ success: isHidden, found: true }});
+ }}
+ return JSON.stringify({{ success: false, found: false }});
+ }})()
+ """
+
+ # For now, just do a single check (polling would require more complex implementation)
+ # In a real implementation, you might want to implement polling on the Python side
+ self._execute_simple_js(script, request_id)
+
+ elif action == "wait_for_load_state":
+ state = params.get("state", "load")
+ timeout = params.get("timeout", 30000)
+
+ script = f"""
+ (async function() {{
+ const state = {json.dumps(state)};
+ const timeout = {timeout};
+ const startTime = Date.now();
+
+ if (state === 'domcontentloaded') {{
+ if (document.readyState === 'loading') {{
+ await new Promise(resolve => {{
+ document.addEventListener('DOMContentLoaded', resolve);
+ }});
+ }}
+ return JSON.stringify({{ success: true }});
+ }} else if (state === 'load') {{
+ if (document.readyState !== 'complete') {{
+ await new Promise(resolve => {{
+ window.addEventListener('load', resolve);
+ }});
+ }}
+ return JSON.stringify({{ success: true }});
+ }} else if (state === 'networkidle') {{
+ // Simple network idle implementation
+ let pendingRequests = 0;
+ const observer = new PerformanceObserver((list) => {{
+ for (const entry of list.getEntries()) {{
+ if (entry.entryType === 'resource') {{
+ pendingRequests--;
+ }}
+ }}
+ }});
+ observer.observe({{ entryTypes: ['resource'] }});
+
+ while (Date.now() - startTime < timeout) {{
+ if (pendingRequests <= 0) {{
+ observer.disconnect();
+ return JSON.stringify({{ success: true }});
+ }}
+ await new Promise(r => setTimeout(r, 500));
+ }}
+
+ observer.disconnect();
+ return JSON.stringify({{ success: false, error: 'Timeout waiting for network idle' }});
+ }}
+
+ return JSON.stringify({{ success: true }});
+ }})()
+ """
+ self.execute_javascript(script, request_id)
+
+ elif action == "wait_for_timeout":
+ timeout = params.get("timeout", 1000)
+ script = f"""
+ (async function() {{
+ await new Promise(r => setTimeout(r, {timeout}));
+ return JSON.stringify({{ success: true }});
+ }})()
+ """
+ self.execute_javascript(script, request_id)
+
+ elif action == "get_attribute":
+ selector = params.get("selector")
+ name = params.get("name")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ const value = element.getAttribute({json.dumps(name)});
+ return JSON.stringify({{ success: true, value: value }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "inner_text":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ return JSON.stringify({{ success: true, text: element.innerText || '' }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "inner_html":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ return JSON.stringify({{ success: true, html: element.innerHTML || '' }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "is_visible":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: true, visible: false }});
+ }}
+ const style = window.getComputedStyle(element);
+ const isVisible = style.display !== 'none' &&
+ style.visibility !== 'hidden' &&
+ style.opacity !== '0' &&
+ element.offsetWidth > 0 &&
+ element.offsetHeight > 0;
+ return JSON.stringify({{ success: true, visible: isVisible }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "is_enabled":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ const isEnabled = !element.disabled;
+ return JSON.stringify({{ success: true, enabled: isEnabled }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "is_checked":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ if (!element) {{
+ return JSON.stringify({{ success: false, error: 'Element not found' }});
+ }}
+ const isChecked = element.checked || false;
+ return JSON.stringify({{ success: true, checked: isChecked }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "query_selector":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const element = document.querySelector({json.dumps(selector)});
+ return JSON.stringify({{ success: true, found: !!element }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ elif action == "query_selector_all":
+ selector = params.get("selector")
+ script = f"""
+ (function() {{
+ try {{
+ const elements = document.querySelectorAll({json.dumps(selector)});
+ return JSON.stringify({{ success: true, count: elements.length }});
+ }} catch (e) {{
+ return JSON.stringify({{ success: false, error: e.toString() }});
+ }}
+ }})()
+ """
+ self._execute_simple_js(script, request_id)
+
+ else:
+ coro = self.send_response(request_id, None, f"Unknown Playwright action: {action}")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
async def handle_command(self, command: dict):
"""Handle commands from WebSocket client."""
cmd_type = command.get("command")
request_id = command.get("request_id", str(uuid.uuid4()))
+ if not cmd_type:
+ await self.send_response(request_id, None, "No command specified")
+ return
+
+ if not self.is_ready and cmd_type not in ["get_info", "close"]:
+ await self.send_response(request_id, None, "Browser not ready yet")
+ return
+
try:
+ # Backward compatible commands
if cmd_type == "navigate":
url = command.get("url")
- GLib.idle_add(self.webview.load_uri, url)
+ if not url:
+ await self.send_response(request_id, None, "URL is required")
+ return
+
+ # Basic URL validation
+ if not url.startswith(('http://', 'https://', 'file://', 'about:')):
+ url = 'https://' + url
+
+ def safe_navigate():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.load_uri(url)
+ except Exception as e:
+ logger.error(f"Error navigating to {url}: {e}")
+
+ GLib.idle_add(safe_navigate)
await self.send_response(request_id, {"status": "navigating", "url": url})
-
+
elif cmd_type == "execute_js":
script = command.get("script")
+ if not script:
+ await self.send_response(request_id, None, "Script is required")
+ return
+
GLib.idle_add(self.execute_javascript, script, request_id)
+
+ elif cmd_type == "simulate_typing":
+ selector = command.get("selector")
+ text = command.get("text", "")
+ delay = command.get("delay", 0.1)
+ if not selector:
+ await self.send_response(request_id, None, "Selector is required")
+ return
+
+ GLib.idle_add(self.simulate_typing, selector, text, request_id, delay)
+
elif cmd_type == "go_back":
- GLib.idle_add(self.webview.go_back)
+ def safe_go_back():
+ try:
+ if self.webview and self.webview.can_go_back():
+ self.webview.go_back()
+ except Exception as e:
+ logger.error(f"Error going back: {e}")
+
+ GLib.idle_add(safe_go_back)
await self.send_response(request_id, {"status": "ok"})
-
+
elif cmd_type == "go_forward":
- GLib.idle_add(self.webview.go_forward)
+ def safe_go_forward():
+ try:
+ if self.webview and self.webview.can_go_forward():
+ self.webview.go_forward()
+ except Exception as e:
+ logger.error(f"Error going forward: {e}")
+
+ GLib.idle_add(safe_go_forward)
await self.send_response(request_id, {"status": "ok"})
-
+
elif cmd_type == "reload":
- GLib.idle_add(self.webview.reload)
+ def safe_reload():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.reload()
+ except Exception as e:
+ logger.error(f"Error reloading: {e}")
+
+ GLib.idle_add(safe_reload)
await self.send_response(request_id, {"status": "reloading"})
- elif cmd_type == "stop":
- GLib.idle_add(self.webview.stop_loading)
- await self.send_response(request_id, {"status": "stopped"})
-
elif cmd_type == "get_info":
def get_browser_info():
- info = {
- "url": self.webview.get_uri(),
- "title": self.webview.get_title(),
- "can_go_back": self.webview.can_go_back(),
- "can_go_forward": self.webview.can_go_forward(),
- "is_loading": self.webview.is_loading()
- }
- asyncio.create_task(self.send_response(request_id, info))
+ try:
+ info = {
+ "is_ready": self.is_ready,
+ "is_closing": self.is_closing
+ }
+
+ if self.webview and self.is_ready:
+ try:
+ info["url"] = self.webview.get_uri()
+ except:
+ info["url"] = None
+
+ try:
+ info["title"] = self.webview.get_title()
+ except:
+ info["title"] = None
+
+ try:
+ info["is_loading"] = self.webview.is_loading()
+ except:
+ info["is_loading"] = False
+
+ try:
+ info["can_go_back"] = self.webview.can_go_back()
+ except:
+ info["can_go_back"] = False
+
+ try:
+ info["can_go_forward"] = self.webview.can_go_forward()
+ except:
+ info["can_go_forward"] = False
+
+ coro = self.send_response(request_id, info)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"Error getting browser info: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
GLib.idle_add(get_browser_info)
-
+
elif cmd_type == "screenshot":
- def take_screenshot():
- def on_snapshot(webview, task, data):
- try:
- surface = webview.get_snapshot_finish(task)
-
- import io
- import cairo
-
- buf = io.BytesIO()
- surface.write_to_png(buf)
- buf.seek(0)
-
-
- screenshot_b64 = base64.b64encode(buf.read()).decode('utf-8')
- asyncio.create_task(self.send_response(request_id, {
- "screenshot": screenshot_b64,
- "format": "png"
- }))
- except Exception as e:
- asyncio.create_task(self.send_response(request_id, None, str(e)))
+ try:
+ with self._gtk_lock:
+ self.pending_callbacks[request_id] = PendingCallback(request_id, time.time(), timeout=60)
+ except Exception as e:
+ await self.send_response(request_id, None, f"Error preparing screenshot: {str(e)}")
+ return
- self.webview.get_snapshot(
- WebKit2.SnapshotRegion.FULL_DOCUMENT,
- WebKit2.SnapshotOptions.NONE,
- None,
- on_snapshot,
- None
- )
- GLib.idle_add(take_screenshot)
+ def on_snapshot(webview, task, data):
+ try:
+ with self._gtk_lock:
+ callback = self.pending_callbacks.pop(request_id, None)
+ if not callback:
+ return
+ except Exception as e:
+ logger.error(f"Error retrieving screenshot callback: {e}")
+ return
+
+ try:
+ surface = webview.get_snapshot_finish(task)
+
+ # Save to bytes
+ import io
+ import cairo
+ buf = io.BytesIO()
+ surface.write_to_png(buf)
+ buf.seek(0)
+
+ screenshot_b64 = base64.b64encode(buf.read()).decode('utf-8')
+ coro = self.send_response(request_id, {"screenshot": screenshot_b64, "format": "png"})
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ error_msg = f"Error capturing screenshot: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ def take_snapshot():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.get_snapshot(
+ WebKit2.SnapshotRegion.FULL_DOCUMENT,
+ WebKit2.SnapshotOptions.NONE,
+ None,
+ on_snapshot,
+ None
+ )
+ else:
+ with self._gtk_lock:
+ self.pending_callbacks.pop(request_id, None)
+ coro = self.send_response(request_id, None, "Browser not available")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ with self._gtk_lock:
+ self.pending_callbacks.pop(request_id, None)
+ error_msg = f"Error starting screenshot: {str(e)}"
+ logger.error(error_msg)
+ coro = self.send_response(request_id, None, error_msg)
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ GLib.idle_add(take_snapshot)
+
elif cmd_type == "set_html":
html = command.get("html", "")
base_uri = command.get("base_uri")
- GLib.idle_add(self.webview.load_html, html, base_uri)
+
+ def safe_load_html():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.load_html(html, base_uri)
+ except Exception as e:
+ logger.error(f"Error loading HTML: {e}")
+
+ GLib.idle_add(safe_load_html)
await self.send_response(request_id, {"status": "html_loaded"})
+
+ # Playwright-compatible commands
+ elif cmd_type == "goto":
+ url = command.get("url")
+ options = command.get("options", {})
+ if not url:
+ await self.send_response(request_id, None, "URL is required")
+ return
+
+ # Basic URL validation
+ if not url.startswith(('http://', 'https://', 'file://', 'about:')):
+ url = 'https://' + url
+
+ def safe_navigate():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.load_uri(url)
+ except Exception as e:
+ logger.error(f"Error navigating to {url}: {e}")
+
+ GLib.idle_add(safe_navigate)
+ await self.send_response(request_id, {"status": "navigating", "url": url})
+
+ elif cmd_type == "evaluate":
+ expression = command.get("expression")
+ if not expression:
+ await self.send_response(request_id, None, "Expression is required")
+ return
+
+ GLib.idle_add(self.execute_javascript, expression, request_id)
+
+ elif cmd_type == "content":
+ script = "document.documentElement.outerHTML"
+ GLib.idle_add(self.execute_javascript, script, request_id)
+
+ elif cmd_type == "title":
+ script = "document.title"
+ GLib.idle_add(self.execute_javascript, script, request_id)
+
+ elif cmd_type == "url":
+ def get_url():
+ try:
+ if self.webview and self.is_ready:
+ url = self.webview.get_uri()
+ coro = self.send_response(request_id, url)
+ else:
+ coro = self.send_response(request_id, None, "Browser not ready")
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+ except Exception as e:
+ coro = self.send_response(request_id, None, str(e))
+ asyncio.run_coroutine_threadsafe(coro, self.loop)
+
+ GLib.idle_add(get_url)
+
+ elif cmd_type == "set_content":
+ html = command.get("html", "")
+ options = command.get("options", {})
+
+ def safe_load_html():
+ try:
+ if self.webview and not self.is_closing:
+ self.webview.load_html(html, None)
+ except Exception as e:
+ logger.error(f"Error loading HTML: {e}")
+
+ GLib.idle_add(safe_load_html)
+ await self.send_response(request_id, {"status": "html_loaded"})
+
+ elif cmd_type == "type":
+ selector = command.get("selector")
+ text = command.get("text", "")
+ options = command.get("options", {})
+ delay = options.get("delay", 0)
+
+ if not selector:
+ await self.send_response(request_id, None, "Selector is required")
+ return
+
+ GLib.idle_add(self.simulate_typing, selector, text, request_id, delay / 1000.0)
+
+ elif cmd_type == "playwright":
+ # Handle Playwright-style actions
+ action = command.get("action")
+ params = command.get("params", {})
+
+ if not action:
+ await self.send_response(request_id, None, "Action is required")
+ return
+
+ GLib.idle_add(self.execute_playwright_action, action, params, request_id)
+
+ elif cmd_type == "close":
+ await self.send_response(request_id, {"status": "closing"})
+ await self.close()
else:
await self.send_response(request_id, None, f"Unknown command: {cmd_type}")
except Exception as e:
- await self.send_response(request_id, None, str(e))
+ error_msg = f"Error handling command {cmd_type}: {str(e)}"
+ logger.error(f"{error_msg}\n{traceback.format_exc()}")
+ await self.send_response(request_id, None, error_msg)
async def close(self):
"""Close the browser window and cleanup."""
- if not self.is_closing:
- self.is_closing = True
+ if self.is_closing:
+ return
+ self.is_closing = True
+ logger.info(f"Closing browser for connection {self.connection_id}")
+
+ # Cancel cleanup task
+ if self._cleanup_task:
+ self._cleanup_task.cancel()
+ try:
+ await self._cleanup_task
+ except asyncio.CancelledError:
+ pass
+
+ # Send responses for all pending callbacks
+ async with self._lock:
+ for callback in list(self.pending_callbacks.values()):
+ await self.send_response(callback.request_id, None, "Browser closing")
+ self.pending_callbacks.clear()
+
def close_window():
- if self.window:
- self.window.destroy()
+ try:
+ with self._gtk_lock:
+ if self.window:
+ self.window.destroy()
+ self.window = None
+ self.webview = None
+ except Exception as e:
+ logger.error(f"Error closing window: {e}")
GLib.idle_add(close_window)
-
+ # Remove from server connections
if self.connection_id in self.server.connections:
del self.server.connections[self.connection_id]
-
class BrowserServer:
"""
WebSocket server that manages multiple browser instances.
"""
- def __init__(self, host: str = "localhost", port: int = 8765):
+ def __init__(self, host: str = "localhost", port: int = 8765, max_connections: int = 10):
self.host = host
self.port = port
+ self.max_connections = max_connections
self.connections: Dict[str, RemoteBrowser] = {}
self.gtk_thread = None
- self.loop = None
-
+ self.gtk_ready = threading.Event()
+ self.server = None
+ self.is_running = False
+ self.executor = ThreadPoolExecutor(max_workers=4)
+ self.loop: Optional[asyncio.AbstractEventLoop] = None
+ self._shutdown_lock = threading.Lock()
+
def start_gtk_thread(self):
"""Start GTK main loop in a separate thread."""
def gtk_main():
- Gtk.main()
+ try:
+ logger.info("Starting GTK main loop")
+ GLib.timeout_add(100, lambda: self.gtk_ready.set() or False)
+ Gtk.main()
+ except Exception as e:
+ logger.error(f"GTK main loop error: {e}")
+ finally:
+ logger.info("GTK main loop ended")
self.gtk_thread = threading.Thread(target=gtk_main, daemon=True)
self.gtk_thread.start()
+ # Wait for GTK to be ready
+ if not self.gtk_ready.wait(5):
+ raise Exception("GTK initialization timeout")
+
async def handle_connection(self, websocket, path):
"""Handle a new WebSocket connection."""
+ if len(self.connections) >= self.max_connections:
+ await websocket.close(1008, f"Maximum connections ({self.max_connections}) reached")
+ return
+
connection_id = str(uuid.uuid4())
- browser = RemoteBrowser(connection_id, websocket, self)
+ logger.info(f"New connection: {connection_id}")
+
+ browser = RemoteBrowser(connection_id, websocket, self, self.loop)
self.connections[connection_id] = browser
-
- GLib.idle_add(browser.create_browser)
-
-
- await websocket.send(json.dumps({
- "type": "connected",
- "connection_id": connection_id,
- "message": "Browser window created"
- }))
-
try:
+ # Start browser tasks
+ await browser.start()
+
+ # Create browser in GTK thread
+ future = self.loop.create_future()
+
+ def create_browser_wrapper():
+ try:
+ browser.create_browser()
+ if browser._creation_error:
+ self.loop.call_soon_threadsafe(future.set_exception,
+ Exception(browser._creation_error))
+ else:
+ self.loop.call_soon_threadsafe(future.set_result, True)
+ except Exception as e:
+ self.loop.call_soon_threadsafe(future.set_exception, e)
+
+ GLib.idle_add(create_browser_wrapper)
+
+ # Wait for browser creation with timeout
+ await asyncio.wait_for(future, timeout=15.0)
+
+ # Send connection confirmation
+ await websocket.send(json.dumps({
+ "type": "connected",
+ "connection_id": connection_id,
+ "message": "Browser window created",
+ "webkit_version": webkit_version
+ }))
+
+ # Handle incoming messages
async for message in websocket:
+ if browser.is_closing:
+ break
+
+ request_id = None
try:
command = json.loads(message)
+ request_id = command.get("request_id")
await browser.handle_command(command)
- except json.JSONDecodeError:
- await websocket.send(json.dumps({
- "type": "error",
- "error": "Invalid JSON"
- }))
+ except json.JSONDecodeError as e:
+ error_msg = f"Invalid JSON: {str(e)}"
+ logger.error(f"{error_msg} - Message: {message[:100]}...")
+ await browser.send_response(request_id, None, error_msg)
except Exception as e:
- await websocket.send(json.dumps({
- "type": "error",
- "error": str(e)
- }))
+ error_msg = f"Error handling message: {str(e)}"
+ logger.error(f"{error_msg} for request '{request_id}'\n{traceback.format_exc()}")
+ await browser.send_response(request_id, None, error_msg)
+
+ except asyncio.TimeoutError:
+ logger.error(f"Browser creation timeout for {connection_id}")
+ await browser.send_event("browser_error", {"error": "Browser creation timeout"})
+ except websockets.exceptions.ConnectionClosed:
+ logger.info(f"Connection closed: {connection_id}")
+ except Exception as e:
+ error_msg = f"Connection error {connection_id}: {str(e)}"
+ logger.error(f"{error_msg}\n{traceback.format_exc()}")
+ try:
+ await browser.send_event("server_error", {"error": str(e)})
+ except:
+ pass
finally:
-
+ logger.info(f"Cleaning up connection: {connection_id}")
await browser.close()
async def start_server(self):
"""Start the WebSocket server."""
- print(f"Starting WebSocket server on ws://{self.host}:{self.port}")
- await websockets.serve(self.handle_connection, self.host, self.port)
- await asyncio.Future()
+ logger.info(f"Starting WebSocket server on ws://{self.host}:{self.port}")
+
+ self.server = await websockets.serve(
+ self.handle_connection,
+ self.host,
+ self.port,
+ max_size=10**7, # 10MB max message size
+ ping_interval=20,
+ ping_timeout=10,
+ compression=None # Disable compression for stability
+ )
+
+ self.is_running = True
+ logger.info("WebSocket server started successfully")
+
+ # Wait for server to close
+ await self.server.wait_closed()
+
+ async def shutdown(self):
+ """Gracefully shutdown the server."""
+ with self._shutdown_lock:
+ if not self.is_running:
+ return
+
+ logger.info("Shutting down server...")
+ self.is_running = False
+
+ # Close all browser connections
+ browsers = list(self.connections.values())
+ if browsers:
+ await asyncio.gather(
+ *[browser.close() for browser in browsers],
+ return_exceptions=True
+ )
+
+ # Close WebSocket server
+ if self.server:
+ self.server.close()
+ await self.server.wait_closed()
+
+ # Shutdown executor
+ self.executor.shutdown(wait=True)
+
+ # Stop GTK main loop
+ def quit_gtk():
+ try:
+ Gtk.main_quit()
+ except:
+ pass
+
+ GLib.idle_add(quit_gtk)
+
+ # Wait for GTK thread to finish
+ if self.gtk_thread and self.gtk_thread.is_alive():
+ self.gtk_thread.join(timeout=5)
+
+ logger.info("Server shutdown complete")
async def run(self):
- """Run the server."""
-
- self.start_gtk_thread()
-
-
- await self.start_server()
+ """Run the server with proper error handling."""
+ try:
+ self.loop = asyncio.get_running_loop()
+ self.start_gtk_thread()
+ await self.start_server()
+ except KeyboardInterrupt:
+ logger.info("Received interrupt signal")
+ except Exception as e:
+ logger.error(f"Server error: {e}\n{traceback.format_exc()}")
+ raise
+ finally:
+ await self.shutdown()
-
-
-CLIENT_EXAMPLE = '''
-import asyncio
-import websockets
-import json
-
-async def test_browser():
- """Example client that controls the browser."""
- uri = "ws://localhost:8765"
+def main():
+ """Main entry point with proper signal handling."""
+ import signal
- async with websockets.connect(uri) as websocket:
- # Wait for connection confirmation
- response = await websocket.recv()
- print(f"Connected: {response}")
-
- # Navigate to a website
- await websocket.send(json.dumps({
- "command": "navigate",
- "url": "https://www.example.com",
- "request_id": "nav1"
- }))
-
- # Wait for navigation response
- response = await websocket.recv()
- print(f"Navigation response: {response}")
-
- # Wait a bit for page to load
- await asyncio.sleep(2)
-
- # Execute JavaScript
- await websocket.send(json.dumps({
- "command": "execute_js",
- "script": "document.title",
- "request_id": "js1"
- }))
-
- response = await websocket.recv()
- print(f"Page title: {response}")
-
- # Get browser info
- await websocket.send(json.dumps({
- "command": "get_info",
- "request_id": "info1"
- }))
-
- response = await websocket.recv()
- print(f"Browser info: {response}")
-
- # Take screenshot
- await websocket.send(json.dumps({
- "command": "screenshot",
- "request_id": "screenshot1"
- }))
-
- response = await websocket.recv()
- result = json.loads(response)
- if result.get("result") and result["result"].get("screenshot"):
- print("Screenshot captured!")
- # You can save the base64 image here
-
- # Load custom HTML
- await websocket.send(json.dumps({
- "command": "set_html",
- "html": "Hello from WebSocket!
This is custom HTML
",
- "request_id": "html1"
- }))
-
- response = await websocket.recv()
- print(f"HTML loaded: {response}")
-
- # Keep connection open for events
- while True:
- try:
- message = await asyncio.wait_for(websocket.recv(), timeout=30)
- print(f"Event: {message}")
- except asyncio.TimeoutError:
- break
-
-asyncio.run(test_browser())
-'''
-
-
-if __name__ == "__main__":
-
- server = BrowserServer(host="localhost", port=8765)
+ server = BrowserServer()
- print("WebSocket Browser Server")
- print("=" * 50)
- print(f"Server will run on ws://localhost:8765")
- print("\nAvailable commands:")
- print("- navigate: Load a URL")
- print("- execute_js: Execute JavaScript and get result")
- print("- go_back/go_forward: Navigate history")
- print("- reload/stop: Reload or stop loading")
- print("- get_info: Get browser state info")
- print("- screenshot: Take a screenshot (base64)")
- print("- set_html: Load custom HTML")
- print("\nExample client code:")
- print("-" * 50)
- print(CLIENT_EXAMPLE)
- print("-" * 50)
+ # Setup signal handlers
+ def signal_handler(sig, frame):
+ logger.info(f"Received signal {sig}")
+ asyncio.create_task(server.shutdown())
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
try:
asyncio.run(server.run())
except KeyboardInterrupt:
- print("\nShutting down server...")
- Gtk.main_quit()
+ logger.info("Exiting.")
+ except Exception as e:
+ logger.error(f"Fatal error: {e}")
+ exit(1)
+if __name__ == "__main__":
+ main()