#!/usr/bin/env python3
"""
Playwright-compatible WebSocket Browser Client
Provides a Playwright-like API for controlling remote browsers via WebSocket.
"""
import asyncio
import websockets
import json
import uuid
import time
import base64
import logging
from typing import Optional, Dict, Any, List, Callable, Union
from dataclasses import dataclass
from contextlib import asynccontextmanager
import weakref
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TimeoutError(Exception):
"""Raised when an operation times out"""
pass
class BrowserError(Exception):
"""Raised when a browser operation fails"""
pass
@dataclass
class ElementHandle:
"""Represents a handle to a DOM element"""
page: 'Page'
selector: str
async def click(self, **options):
"""Click the element"""
return await self.page._playwright_action("click", {"selector": self.selector, "options": options})
async def fill(self, value: str):
"""Fill the element with text"""
return await self.page._playwright_action("fill", {"selector": self.selector, "value": value})
async def type(self, text: str, delay: int = 0):
"""Type text into the element"""
return await self.page.type(self.selector, text, delay=delay)
async def press(self, key: str):
"""Press a key"""
return await self.page._playwright_action("press", {"selector": self.selector, "key": key})
async def hover(self):
"""Hover over the element"""
return await self.page._playwright_action("hover", {"selector": self.selector})
async def focus(self):
"""Focus the element"""
return await self.page._playwright_action("focus", {"selector": self.selector})
async def get_attribute(self, name: str) -> Optional[str]:
"""Get element attribute"""
result = await self.page._playwright_action("get_attribute", {"selector": self.selector, "name": name})
return result.get("value") if result.get("success") else None
async def inner_text(self) -> str:
"""Get inner text"""
result = await self.page._playwright_action("inner_text", {"selector": self.selector})
return result.get("text", "") if result.get("success") else ""
async def inner_html(self) -> str:
"""Get inner HTML"""
result = await self.page._playwright_action("inner_html", {"selector": self.selector})
return result.get("html", "") if result.get("success") else ""
async def is_visible(self) -> bool:
"""Check if element is visible"""
result = await self.page._playwright_action("is_visible", {"selector": self.selector})
return result.get("visible", False) if result.get("success") else False
async def is_enabled(self) -> bool:
"""Check if element is enabled"""
result = await self.page._playwright_action("is_enabled", {"selector": self.selector})
return result.get("enabled", False) if result.get("success") else False
async def is_checked(self) -> bool:
"""Check if element is checked"""
result = await self.page._playwright_action("is_checked", {"selector": self.selector})
return result.get("checked", False) if result.get("success") else False
async def check(self):
"""Check a checkbox or radio button"""
return await self.page._playwright_action("check", {"selector": self.selector})
async def uncheck(self):
"""Uncheck a checkbox"""
return await self.page._playwright_action("uncheck", {"selector": self.selector})
async def select_option(self, values: Union[str, List[str]]):
"""Select option(s) in a select element"""
return await self.page._playwright_action("select_option", {"selector": self.selector, "values": values})
class Locator:
"""Playwright-style locator for finding elements"""
def __init__(self, page: 'Page', selector: str):
self.page = page
self.selector = selector
async def click(self, **options):
"""Click the first matching element"""
return await self.page.click(self.selector, **options)
async def fill(self, value: str):
"""Fill the first matching element"""
return await self.page.fill(self.selector, value)
async def type(self, text: str, delay: int = 0):
"""Type into the first matching element"""
return await self.page.type(self.selector, text, delay=delay)
async def press(self, key: str):
"""Press a key on the first matching element"""
return await self.page.press(self.selector, key)
async def hover(self):
"""Hover over the first matching element"""
return await self.page.hover(self.selector)
async def focus(self):
"""Focus the first matching element"""
return await self.page.focus(self.selector)
async def get_attribute(self, name: str) -> Optional[str]:
"""Get attribute of the first matching element"""
element = ElementHandle(self.page, self.selector)
return await element.get_attribute(name)
async def inner_text(self) -> str:
"""Get inner text of the first matching element"""
element = ElementHandle(self.page, self.selector)
return await element.inner_text()
async def inner_html(self) -> str:
"""Get inner HTML of the first matching element"""
element = ElementHandle(self.page, self.selector)
return await element.inner_html()
async def is_visible(self) -> bool:
"""Check if the first matching element is visible"""
element = ElementHandle(self.page, self.selector)
return await element.is_visible()
async def is_enabled(self) -> bool:
"""Check if the first matching element is enabled"""
element = ElementHandle(self.page, self.selector)
return await element.is_enabled()
async def is_checked(self) -> bool:
"""Check if the first matching element is checked"""
element = ElementHandle(self.page, self.selector)
return await element.is_checked()
async def check(self):
"""Check the first matching checkbox or radio"""
return await self.page.check(self.selector)
async def uncheck(self):
"""Uncheck the first matching checkbox"""
return await self.page.uncheck(self.selector)
async def select_option(self, values: Union[str, List[str]]):
"""Select option(s) in the first matching select"""
return await self.page.select_option(self.selector, values)
async def count(self) -> int:
"""Count matching elements"""
result = await self.page._playwright_action("query_selector_all", {"selector": self.selector})
return result.get("count", 0) if result.get("success") else 0
async def first(self) -> ElementHandle:
"""Get the first matching element"""
return ElementHandle(self.page, self.selector)
async def wait_for(self, **options):
"""Wait for the element to appear"""
return await self.page.wait_for_selector(self.selector, **options)
class Page:
"""Represents a browser page with Playwright-compatible API"""
def __init__(self, browser: 'Browser', connection_id: str):
self.browser = browser
self.connection_id = connection_id
self._closed = False
self._event_listeners: Dict[str, List[Callable]] = {}
async def goto(self, url: str, **options) -> Dict[str, Any]:
"""Navigate to a URL"""
return await self.browser._send_command({
"command": "goto",
"url": url,
"options": options
})
async def go_back(self, **options) -> Dict[str, Any]:
"""Navigate back"""
return await self.browser._send_command({"command": "go_back"})
async def go_forward(self, **options) -> Dict[str, Any]:
"""Navigate forward"""
return await self.browser._send_command({"command": "go_forward"})
async def reload(self, **options) -> Dict[str, Any]:
"""Reload the page"""
return await self.browser._send_command({"command": "reload"})
async def set_content(self, html: str, **options) -> Dict[str, Any]:
"""Set page content"""
return await self.browser._send_command({
"command": "set_content",
"html": html,
"options": options
})
async def content(self) -> str:
"""Get page content"""
result = await self.browser._send_command({"command": "content"})
return result if isinstance(result, str) else ""
async def title(self) -> str:
"""Get page title"""
result = await self.browser._send_command({"command": "title"})
return result if isinstance(result, str) else ""
async def url(self) -> str:
"""Get page URL"""
result = await self.browser._send_command({"command": "url"})
return result if isinstance(result, str) else ""
async def evaluate(self, expression: str, arg=None) -> Any:
"""Evaluate JavaScript in the page"""
if arg is not None:
# Simple argument serialization
expression = f"({expression})({json.dumps(arg)})"
return await self.browser._send_command({
"command": "evaluate",
"expression": expression
})
async def evaluate_handle(self, expression: str, arg=None) -> Any:
"""Evaluate JavaScript and return a handle (simplified)"""
return await self.evaluate(expression, arg)
async def screenshot(self, **options) -> bytes:
"""Take a screenshot"""
result = await self.browser._send_command({
"command": "screenshot",
"options": options
})
if isinstance(result, dict) and "screenshot" in result:
return base64.b64decode(result["screenshot"])
raise BrowserError("Failed to capture screenshot")
async def click(self, selector: str, **options) -> Dict[str, Any]:
"""Click an element"""
return await self._playwright_action("click", {"selector": selector, "options": options})
async def dblclick(self, selector: str, **options) -> Dict[str, Any]:
"""Double-click an element"""
# Simulate double click with two clicks
await self.click(selector, **options)
return await self.click(selector, **options)
async def fill(self, selector: str, value: str, **options) -> Dict[str, Any]:
"""Fill an input element"""
return await self._playwright_action("fill", {"selector": selector, "value": value})
async def type(self, selector: str, text: str, delay: int = 0) -> Dict[str, Any]:
"""Type text into an element"""
return await self.browser._send_command({
"command": "type",
"selector": selector,
"text": text,
"options": {"delay": delay}
})
async def press(self, selector: str, key: str, **options) -> Dict[str, Any]:
"""Press a key"""
return await self._playwright_action("press", {"selector": selector, "key": key})
async def check(self, selector: str, **options) -> Dict[str, Any]:
"""Check a checkbox or radio button"""
return await self._playwright_action("check", {"selector": selector})
async def uncheck(self, selector: str, **options) -> Dict[str, Any]:
"""Uncheck a checkbox"""
return await self._playwright_action("uncheck", {"selector": selector})
async def select_option(self, selector: str, values: Union[str, List[str]], **options) -> Dict[str, Any]:
"""Select option(s) in a select element"""
return await self._playwright_action("select_option", {"selector": selector, "values": values})
async def hover(self, selector: str, **options) -> Dict[str, Any]:
"""Hover over an element"""
return await self._playwright_action("hover", {"selector": selector})
async def focus(self, selector: str, **options) -> Dict[str, Any]:
"""Focus an element"""
return await self._playwright_action("focus", {"selector": selector})
async def wait_for_selector(self, selector: str, **options) -> Optional[ElementHandle]:
"""Wait for a selector to appear"""
result = await self._playwright_action("wait_for_selector", {
"selector": selector,
"options": options
})
if result.get("success"):
return ElementHandle(self, selector)
return None
async def wait_for_load_state(self, state: str = "load", **options) -> None:
"""Wait for a load state"""
await self._playwright_action("wait_for_load_state", {
"state": state,
"timeout": options.get("timeout", 30000)
})
async def wait_for_timeout(self, timeout: int) -> None:
"""Wait for a timeout"""
await self._playwright_action("wait_for_timeout", {"timeout": timeout})
async def wait_for_function(self, expression: str, **options) -> Any:
"""Wait for a function to return true"""
# Simplified implementation using polling
timeout = options.get("timeout", 30000)
polling = options.get("polling", 100)
start_time = time.time()
while (time.time() - start_time) * 1000 < timeout:
result = await self.evaluate(expression)
if result:
return result
await asyncio.sleep(polling / 1000)
raise TimeoutError(f"Timeout waiting for function: {expression}")
def locator(self, selector: str) -> Locator:
"""Create a locator for the given selector"""
return Locator(self, selector)
async def query_selector(self, selector: str) -> Optional[ElementHandle]:
"""Query for a single element"""
result = await self._playwright_action("query_selector", {"selector": selector})
if result.get("success") and result.get("found"):
return ElementHandle(self, selector)
return None
async def query_selector_all(self, selector: str) -> List[ElementHandle]:
"""Query for all matching elements"""
result = await self._playwright_action("query_selector_all", {"selector": selector})
if result.get("success"):
count = result.get("count", 0)
# For simplicity, we return handles with indexed selectors
return [ElementHandle(self, f"{selector}:nth-of-type({i+1})") for i in range(count)]
return []
async def _playwright_action(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a Playwright-style action"""
result = await self.browser._send_command({
"command": "playwright",
"action": action,
"params": params
})
# Handle string results from JavaScript execution
if isinstance(result, str):
try:
return json.loads(result)
except json.JSONDecodeError:
return {"success": False, "error": f"Invalid response: {result}"}
return result or {}
def on(self, event: str, callback: Callable) -> None:
"""Register an event listener"""
if event not in self._event_listeners:
self._event_listeners[event] = []
self._event_listeners[event].append(callback)
def once(self, event: str, callback: Callable) -> None:
"""Register a one-time event listener"""
def wrapper(*args, **kwargs):
self.remove_listener(event, wrapper)
return callback(*args, **kwargs)
self.on(event, wrapper)
def remove_listener(self, event: str, callback: Callable) -> None:
"""Remove an event listener"""
if event in self._event_listeners:
self._event_listeners[event] = [
cb for cb in self._event_listeners[event] if cb != callback
]
def emit(self, event: str, data: Any) -> None:
"""Emit an event to all listeners"""
if event in self._event_listeners:
for callback in self._event_listeners[event][:]:
try:
callback(data)
except Exception as e:
logger.error(f"Error in event listener for {event}: {e}")
async def close(self) -> None:
"""Close the page"""
if not self._closed:
self._closed = True
await self.browser._send_command({"command": "close"})
class BrowserContext:
"""Browser context (simplified for single-page support)"""
def __init__(self, browser: 'Browser'):
self.browser = browser
self.pages: List[Page] = []
async def new_page(self) -> Page:
"""Create a new page (currently limited to one per context)"""
if self.pages:
raise BrowserError("Multiple pages not supported in this implementation")
page = Page(self.browser, self.browser.connection_id)
self.pages.append(page)
return page
async def close(self) -> None:
"""Close the context and all its pages"""
for page in self.pages[:]:
await page.close()
self.pages.clear()
class Browser:
"""Represents a browser instance with Playwright-compatible API"""
def __init__(self, ws_url: str = "ws://localhost:8765"):
self.ws_url = ws_url
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.connection_id: Optional[str] = None
self._response_futures: Dict[str, asyncio.Future] = {}
self._event_listeners: Dict[str, List[Callable]] = {}
self._receive_task: Optional[asyncio.Task] = None
self._closed = False
self.contexts: List[BrowserContext] = []
async def connect(self) -> None:
"""Connect to the browser server"""
self.websocket = await websockets.connect(self.ws_url)
self._receive_task = asyncio.create_task(self._receive_messages())
# Wait for connection confirmation
timeout = 15
start_time = time.time()
while not self.connection_id and time.time() - start_time < timeout:
await asyncio.sleep(0.1)
if not self.connection_id:
raise TimeoutError("Failed to establish browser connection")
async def _receive_messages(self) -> None:
"""Receive messages from the WebSocket"""
try:
async for message in self.websocket:
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "connected":
self.connection_id = data.get("connection_id")
logger.info(f"Connected with ID: {self.connection_id}")
elif msg_type == "response":
request_id = data.get("request_id")
if request_id in self._response_futures:
future = self._response_futures.pop(request_id)
if data.get("error"):
future.set_exception(BrowserError(data["error"]))
else:
future.set_result(data.get("result"))
elif msg_type == "event":
event_type = data.get("event")
event_data = data.get("data", {})
# Emit to browser-level listeners
self._emit_event(event_type, event_data)
# Emit to page-level listeners if applicable
for context in self.contexts:
for page in context.pages:
page.emit(event_type, event_data)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse message: {e}")
except Exception as e:
logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed:
logger.info("WebSocket connection closed")
except Exception as e:
logger.error(f"Error in receive loop: {e}")
finally:
# Clean up pending futures
for future in self._response_futures.values():
if not future.done():
future.set_exception(BrowserError("Connection closed"))
self._response_futures.clear()
async def _send_command(self, command: Dict[str, Any]) -> Any:
"""Send a command and wait for response"""
if not self.websocket or self._closed:
raise BrowserError("WebSocket connection is closed")
request_id = str(uuid.uuid4())
command["request_id"] = request_id
# Create future for response
future = asyncio.Future()
self._response_futures[request_id] = future
try:
await self.websocket.send(json.dumps(command))
# Wait for response with timeout
timeout = command.get("timeout", 30)
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
self._response_futures.pop(request_id, None)
raise TimeoutError(f"Command timeout: {command.get('command')}")
except Exception as e:
self._response_futures.pop(request_id, None)
raise BrowserError(f"Command failed: {e}")
def _emit_event(self, event: str, data: Any) -> None:
"""Emit an event to all browser-level listeners"""
if event in self._event_listeners:
for callback in self._event_listeners[event][:]:
try:
callback(data)
except Exception as e:
logger.error(f"Error in browser event listener for {event}: {e}")
def on(self, event: str, callback: Callable) -> None:
"""Register a browser-level event listener"""
if event not in self._event_listeners:
self._event_listeners[event] = []
self._event_listeners[event].append(callback)
def once(self, event: str, callback: Callable) -> None:
"""Register a one-time browser-level event listener"""
def wrapper(*args, **kwargs):
self.remove_listener(event, wrapper)
return callback(*args, **kwargs)
self.on(event, wrapper)
def remove_listener(self, event: str, callback: Callable) -> None:
"""Remove a browser-level event listener"""
if event in self._event_listeners:
self._event_listeners[event] = [
cb for cb in self._event_listeners[event] if cb != callback
]
async def new_context(self, **options) -> BrowserContext:
"""Create a new browser context"""
context = BrowserContext(self)
self.contexts.append(context)
return context
async def new_page(self) -> Page:
"""Create a new page in the default context"""
if not self.contexts:
context = await self.new_context()
else:
context = self.contexts[0]
return await context.new_page()
async def close(self) -> None:
"""Close the browser"""
if self._closed:
return
self._closed = True
# Close all contexts
for context in self.contexts[:]:
await context.close()
# Cancel receive task
if self._receive_task:
self._receive_task.cancel()
try:
await self._receive_task
except asyncio.CancelledError:
pass
# Close WebSocket
if self.websocket:
await self.websocket.close()
logger.info("Browser closed")
@property
def is_connected(self) -> bool:
"""Check if browser is connected"""
return bool(self.websocket and not self.websocket.closed and self.connection_id)
class Playwright:
"""Main Playwright-compatible API entry point"""
def __init__(self):
self.browsers: List[Browser] = []
class chromium:
"""Chromium browser launcher (uses WebKit in this implementation)"""
@staticmethod
async def launch(**options) -> Browser:
"""Launch a browser instance"""
ws_url = options.get("ws_endpoint", "ws://localhost:8765")
browser = Browser(ws_url)
await browser.connect()
return browser
class firefox:
"""Firefox browser launcher (uses WebKit in this implementation)"""
@staticmethod
async def launch(**options) -> Browser:
"""Launch a browser instance"""
return await Playwright.chromium.launch(**options)
class webkit:
"""WebKit browser launcher"""
@staticmethod
async def launch(**options) -> Browser:
"""Launch a browser instance"""
return await Playwright.chromium.launch(**options)
# Convenience function for async context manager
@asynccontextmanager
async def browser(**options):
"""Async context manager for browser"""
playwright = Playwright()
browser_instance = await playwright.chromium.launch(**options)
try:
yield browser_instance
finally:
await browser_instance.close()
# Convenience function for creating a browser and page
async def launch(**options) -> Browser:
"""Launch a browser instance"""
playwright = Playwright()
return await playwright.chromium.launch(**options)
# Example usage functions for compatibility
async def goto(page: Page, url: str, **options):
"""Navigate to URL (convenience function)"""
return await page.goto(url, **options)
async def screenshot(page: Page, path: str = None, **options):
"""Take screenshot (convenience function)"""
data = await page.screenshot(**options)
if path:
with open(path, 'wb') as f:
f.write(data)
return data
async def evaluate(page: Page, expression: str, arg=None):
"""Evaluate JavaScript (convenience function)"""
return await page.evaluate(expression, arg)
# Selector engine helpers
def css(selector: str) -> str:
"""CSS selector helper"""
return selector
def xpath(selector: str) -> str:
"""XPath selector helper (converted to CSS where possible)"""
# Simple XPath to CSS conversion for common cases
if selector.startswith("//"):
return selector # Return as-is, will need special handling
return selector
def text(text_content: str) -> str:
"""Text selector helper"""
return f"//*[contains(text(), '{text_content}')]"