import asyncio import websockets import json async def create_browser_window(window_id: int, url: str): """Create and control a single browser window.""" uri = "ws://localhost:8765" async with websockets.connect(uri) as websocket: response = await websocket.recv() conn_data = json.loads(response) print(f"Window {window_id} connected: {conn_data['connection_id'][:8]}") await websocket.send(json.dumps({ "command": "navigate", "url": url, "request_id": f"nav_{window_id}" })) await websocket.recv() await asyncio.sleep(2) await websocket.send(json.dumps({ "command": "execute_js", "script": f"document.body.style.backgroundColor = '#{window_id:02x}0000'; document.title", "request_id": f"js_{window_id}" })) response = await websocket.recv() data = json.loads(response) print(f"Window {window_id} - Title: {data.get('result')}") await asyncio.sleep(10) print(f"Window {window_id} closing...") async def parallel_browser_demo(): """Demo: Open multiple browser windows in parallel.""" urls = [ "https://www.python.org", "https://www.github.com", "https://www.example.com", "https://www.wikipedia.org" ] tasks = [] for i, url in enumerate(urls): task = asyncio.create_task(create_browser_window(i + 1, url)) tasks.append(task) await asyncio.sleep(0.5) await asyncio.gather(*tasks) print("All browser windows closed.") async def automated_testing_demo(): """Demo: Automated testing across multiple sites.""" test_sites = [ {"url": "https://www.example.com", "selector": "h1"}, {"url": "https://www.python.org", "selector": ".introduction h1"}, {"url": "https://httpbin.org/html", "selector": "h1"}, ] async def test_site(site_info): uri = "ws://localhost:8765" async with websockets.connect(uri) as websocket: await websocket.recv() await websocket.send(json.dumps({ "command": "navigate", "url": site_info["url"] })) await websocket.recv() await asyncio.sleep(3) await websocket.send(json.dumps({ "command": "execute_js", "script": f"document.querySelector('{site_info['selector']}')?.textContent || 'Not found'" })) response = await websocket.recv() data = json.loads(response) heading = data.get("result", "Error") await websocket.send(json.dumps({ "command": "screenshot" })) screenshot_response = await websocket.recv() screenshot_data = json.loads(screenshot_response) print(f"Site: {site_info['url']}") print(f" Heading: {heading}") print(f" Screenshot: {'✓' if screenshot_data.get('result') else '✗'}") print() tasks = [test_site(site) for site in test_sites] await asyncio.gather(*tasks) async def form_automation_demo(): """Demo: Fill forms in multiple windows.""" uri = "ws://localhost:8765" async with websockets.connect(uri) as websocket: await websocket.recv() html = """