import os, sys, tempfile, subprocess, time from pathlib import Path import pytest PROJECT_ROOT = Path(__file__).resolve().parent.parent SCREENSHOT_DIR = Path("/tmp/devplace_test_screenshots") def _xdist_port(): wid = os.environ.get("PYTEST_XDIST_WORKER", "master") if wid == "master": return 10501 try: idx = int(wid.replace("gw", "")) return 10501 + idx except (ValueError, AttributeError): return 10501 PORT = _xdist_port() BASE_URL = f"http://127.0.0.1:{PORT}" _TEST_DB = tempfile.NamedTemporaryFile(suffix=f"_{PORT}.db", delete=False) _TEST_DB.close() os.environ["DEVPLACE_DATABASE_URL"] = f"sqlite:///{_TEST_DB.name}" os.environ["SECRET_KEY"] = "test-secret-key" os.environ["DEVPLACE_DISABLE_SERVICES"] = "1" os.environ["DEVPLACE_RATE_LIMIT"] = "1000000" def save_failure_screenshot(page, test_name): SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True) try: if page and not page.is_closed(): path = str(SCREENSHOT_DIR / f"{test_name}.png") page.screenshot(path=path, full_page=True) except Exception: pass @pytest.hookimpl(hookwrapper=True) def pytest_runtest_makereport(item, call): outcome = yield report = outcome.get_result() if report.when == "call" and report.failed: for name in ("page", "alice", "bob"): if name in item.funcargs: try: obj = item.funcargs[name] if name in ("alice", "bob"): obj = obj[0] save_failure_screenshot(obj, item.nodeid.replace("::", "_").replace("/", "_")) except Exception: pass @pytest.fixture(scope="session") def test_db_path(): yield _TEST_DB.name try: os.unlink(_TEST_DB.name) except OSError: pass @pytest.fixture(scope="session") def app_server(test_db_path): env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" log_file = tempfile.NamedTemporaryFile(suffix=f"_server_{PORT}.log", delete=False) proc = subprocess.Popen( [sys.executable, "-m", "uvicorn", "devplacepy.main:app", "--host", "127.0.0.1", "--port", str(PORT), "--log-level", "warning"], cwd=str(PROJECT_ROOT), env=env, stdout=log_file, stderr=subprocess.STDOUT, ) def server_log(): try: with open(log_file.name, "r", errors="replace") as f: return f.read()[-5000:] except OSError: return "" deadline = time.time() + 30 while time.time() < deadline: try: import urllib.request urllib.request.urlopen(f"{BASE_URL}/", timeout=2) break except Exception: poll = proc.poll() if poll is not None: proc.wait() raise RuntimeError( f"Server process died (exit code {poll}). " f"Log:\n{server_log()}" ) time.sleep(0.5) else: proc.terminate() proc.wait(timeout=5) raise RuntimeError( f"Server did not start after 30s. " f"Log:\n{server_log()}" ) yield proc try: proc.terminate() proc.wait(timeout=10) except Exception: proc.kill() proc.wait(timeout=5) finally: try: log_file.close() os.unlink(log_file.name) except OSError: pass @pytest.fixture(scope="session") def playwright_instance(): from playwright.sync_api import sync_playwright with sync_playwright() as p: yield p @pytest.fixture(scope="session") def browser(playwright_instance): headless = os.environ.get("PLAYWRIGHT_HEADLESS", "1") == "1" b = playwright_instance.chromium.launch( headless=headless, slow_mo=int(os.environ.get("PLAYWRIGHT_SLOW_MO", "0")), args=["--window-size=1400,900"], ) yield b b.close() @pytest.fixture(scope="module") def browser_context(browser): ctx = browser.new_context(viewport={"width": 1400, "height": 900}, permissions=["clipboard-read", "clipboard-write"]) yield ctx ctx.close() @pytest.fixture def page(browser_context): browser_context.clear_cookies() p = browser_context.new_page() p.set_default_timeout(15000) p.bring_to_front() yield p p.close() def signup_user(page, user): page.bring_to_front() page.goto(f"{BASE_URL}/auth/signup", wait_until="domcontentloaded") page.fill("#username", user["username"]) page.fill("#email", user["email"]) page.fill("#password", user["password"]) page.fill("#confirm_password", user["password"]) page.click("button:has-text('Create account')") page.wait_for_url("**/feed", timeout=10000) def login_user(page, user): page.bring_to_front() page.goto(f"{BASE_URL}/auth/login", wait_until="domcontentloaded") page.fill("#email", user["email"]) page.fill("#password", user["password"]) page.click("button:has-text('Sign in')") page.wait_for_url("**/feed", timeout=10000, wait_until="domcontentloaded") def assert_share_copies(page, expected_fragment): from playwright.sync_api import expect share = page.locator("button[data-share]").first share.scroll_into_view_if_needed() share.click() expect(share).to_have_text("Copied!", timeout=3000) expect(share).not_to_have_text("Copied!", timeout=3000) try: clip = page.evaluate("navigator.clipboard.readText()") except Exception: clip = None if clip: assert clip.startswith("http") and expected_fragment in clip, f"clipboard={clip!r}" @pytest.fixture(scope="session") def seeded_db(app_server): """Create test users once at session level using requests directly.""" import urllib.request import urllib.parse users = [ ("alice_test", "alice@test.devplace", "secret123"), ("bob_test", "bob@test.devplace", "secret456"), ] for username, email, pw in users: data = urllib.parse.urlencode({ "username": username, "email": email, "password": pw, "confirm_password": pw, }).encode() req = urllib.request.Request( f"{BASE_URL}/auth/signup", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) urllib.request.urlopen(req) return { "alice": {"username": "alice_test", "email": "alice@test.devplace", "password": "secret123"}, "bob": {"username": "bob_test", "email": "bob@test.devplace", "password": "secret456"}, } @pytest.fixture def alice(page, seeded_db): login_user(page, seeded_db["alice"]) return page, seeded_db["alice"] @pytest.fixture def bob(browser, seeded_db): ctx = browser.new_context(viewport={"width": 1400, "height": 900}, permissions=["clipboard-read", "clipboard-write"]) p = ctx.new_page() p.set_default_timeout(15000) p.bring_to_front() login_user(p, seeded_db["bob"]) yield p, seeded_db["bob"] p.close() ctx.close() @pytest.fixture(scope="session") def local_db(): from devplacepy.database import init_db, db init_db() return db