diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..5db2e28 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,85 @@ +import devplacepy.cache as cache_mod +from devplacepy.cache import TTLCache + + +class FakeClock: + def __init__(self, start=1000.0): + self.now = start + + def time(self): + return self.now + + +def test_get_miss_returns_none(): + cache = TTLCache(ttl=60) + assert cache.get("absent") is None + + +def test_set_and_get_round_trip(): + cache = TTLCache(ttl=60) + cache.set("key", "value") + assert cache.get("key") == "value" + + +def test_entry_expires_after_ttl(monkeypatch): + clock = FakeClock() + monkeypatch.setattr(cache_mod, "time", clock) + cache = TTLCache(ttl=10) + cache.set("key", "value") + clock.now += 11 + assert cache.get("key") is None + + +def test_lru_evicts_oldest_when_over_max_size(): + cache = TTLCache(ttl=60, max_size=2) + cache.set("a", 1) + cache.set("b", 2) + cache.set("c", 3) + assert cache.get("a") is None + assert cache.get("b") == 2 + assert cache.get("c") == 3 + + +def test_get_promotes_recency(): + cache = TTLCache(ttl=60, max_size=2) + cache.set("a", 1) + cache.set("b", 2) + cache.get("a") + cache.set("c", 3) + assert cache.get("a") == 1 + assert cache.get("b") is None + assert cache.get("c") == 3 + + +def test_pop_removes_entry(): + cache = TTLCache(ttl=60) + cache.set("key", "value") + cache.pop("key") + assert cache.get("key") is None + + +def test_pop_missing_key_is_noop(): + cache = TTLCache(ttl=60) + cache.pop("absent") + + +def test_clear_empties_cache(): + cache = TTLCache(ttl=60) + cache.set("a", 1) + cache.set("b", 2) + cache.clear() + assert cache.get("a") is None + assert cache.get("b") is None + + +def test_items_excludes_expired_entries(monkeypatch): + clock = FakeClock() + monkeypatch.setattr(cache_mod, "time", clock) + cache = TTLCache(ttl=10) + cache.set("old", 1) + clock.now += 5 + cache.set("fresh", 2) + clock.now += 7 + items = dict(cache.items()) + assert "old" not in items + assert items["fresh"] == 2 diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..c315bd1 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,102 @@ +import argparse +from datetime import datetime, timezone, timedelta + +import pytest + +from devplacepy import cli +from devplacepy.database import get_table +from devplacepy.utils import generate_uid + + +def _make_user(role="Member"): + username = f"cli_{generate_uid()[:8]}" + get_table("users").insert({ + "uid": generate_uid(), + "username": username, + "email": f"{username}@t.dev", + "role": role, + "created_at": datetime.now(timezone.utc).isoformat(), + }) + return username + + +def test_role_get_prints_lowercased_role(local_db, capsys): + username = _make_user(role="Admin") + cli.cmd_role_get(argparse.Namespace(username=username)) + assert capsys.readouterr().out.strip() == "admin" + + +def test_role_get_missing_user_exits(local_db): + with pytest.raises(SystemExit): + cli.cmd_role_get(argparse.Namespace(username="cli_absent_xyz")) + + +def test_role_set_updates_role(local_db): + username = _make_user(role="Member") + cli.cmd_role_set(argparse.Namespace(username=username, role="admin")) + user = get_table("users").find_one(username=username) + assert user["role"] == "Admin" + + +def test_role_set_invalid_role_exits(local_db): + username = _make_user() + with pytest.raises(SystemExit): + cli.cmd_role_set(argparse.Namespace(username=username, role="superuser")) + + +def test_role_set_missing_user_exits(local_db): + with pytest.raises(SystemExit): + cli.cmd_role_set(argparse.Namespace(username="cli_absent_xyz", role="admin")) + + +def test_news_clear_deletes_all_tables(local_db, capsys): + for table in ("news", "news_images", "news_sync"): + get_table(table).insert({"uid": generate_uid(), "marker": "cli"}) + cli.cmd_news_clear(argparse.Namespace()) + for table in ("news", "news_images", "news_sync"): + assert get_table(table).count() == 0 + assert "News data cleared" in capsys.readouterr().out + + +def test_news_sanitize_strips_html(local_db, capsys): + uid = generate_uid() + get_table("news").insert({ + "uid": uid, + "description": "Bold & clean", + "content": "
Body
", + }) + cli.cmd_news_sanitize(argparse.Namespace()) + row = get_table("news").find_one(uid=uid) + assert row["description"] == "Bold & clean" + assert row["content"] == "Body" + assert "Sanitized" in capsys.readouterr().out + + +def test_attachments_prune_removes_only_stale_orphans(local_db, capsys): + old_uid = generate_uid() + fresh_uid = generate_uid() + attachments = get_table("attachments") + attachments.insert({ + "uid": old_uid, "target_type": "", "target_uid": "", + "created_at": "2000-01-01T00:00:00+00:00", + }) + attachments.insert({ + "uid": fresh_uid, "target_type": "", "target_uid": "", + "created_at": datetime.now(timezone.utc).isoformat(), + }) + cli.cmd_attachments_prune(argparse.Namespace(hours=24)) + assert attachments.find_one(uid=old_uid) is None + assert attachments.find_one(uid=fresh_uid) is not None + + +def test_main_without_command_exits(monkeypatch): + monkeypatch.setattr("sys.argv", ["devplace"]) + with pytest.raises(SystemExit): + cli.main() + + +def test_main_dispatches_subcommand(local_db, monkeypatch, capsys): + username = _make_user(role="Admin") + monkeypatch.setattr("sys.argv", ["devplace", "role", "get", username]) + cli.main() + assert capsys.readouterr().out.strip() == "admin" diff --git a/tests/test_content_unit.py b/tests/test_content_unit.py new file mode 100644 index 0000000..e0bc35c --- /dev/null +++ b/tests/test_content_unit.py @@ -0,0 +1,58 @@ +from datetime import datetime, timezone + +from devplacepy.content import is_owner, canonical_redirect, first_image_url, enrich_items +from devplacepy.database import get_table, get_users_by_uids +from devplacepy.utils import generate_uid + + +def test_is_owner_matches_user_uid(): + assert is_owner({"user_uid": "u1"}, {"uid": "u1"}) is True + assert is_owner({"user_uid": "u1"}, {"uid": "u2"}) is False + assert is_owner(None, {"uid": "u1"}) is False + assert is_owner({"user_uid": "u1"}, None) is False + + +def test_canonical_redirect_when_slug_differs(): + item = {"slug": "abcd1234-title", "uid": "abcd1234"} + assert canonical_redirect("posts", item, "abcd1234-title") is None + response = canonical_redirect("posts", item, "abcd1234") + assert response is not None + assert response.status_code == 301 + assert response.headers["location"] == "/posts/abcd1234-title" + + +def test_first_image_url_prefers_inline(): + assert first_image_url({"image": "pic.png"}, None) == "/static/uploads/pic.png" + + +def test_first_image_url_uses_first_image_attachment(): + item = {"image": None} + attachments = [{"is_image": False, "url": "/a"}, {"is_image": True, "url": "/b"}] + assert first_image_url(item, attachments) == "/b" + + +def test_first_image_url_none_when_absent(): + assert first_image_url({}, []) is None + + +def test_enrich_items_attaches_author_and_extras(local_db): + uid = generate_uid() + username = f"enrich_{uid[:8]}" + get_table("users").insert({ + "uid": uid, "username": username, "email": f"{username}@t.dev", + "created_at": datetime.now(timezone.utc).isoformat(), + }) + item_uid = generate_uid() + items = [{"uid": item_uid, "user_uid": uid, "created_at": datetime.now(timezone.utc).isoformat()}] + authors = get_users_by_uids([uid]) + enriched = enrich_items( + items, "post", authors, + extra_maps={"comment_count": {item_uid: 4}, "flag": lambda item: "ok"}, + ) + entry = enriched[0] + assert entry["post"] is items[0] + assert entry["author"]["username"] == username + assert entry["time_ago"] == "just now" + assert entry["my_vote"] == 0 + assert entry["comment_count"] == 4 + assert entry["flag"] == "ok" diff --git a/tests/test_db_helpers.py b/tests/test_db_helpers.py new file mode 100644 index 0000000..4f4efd0 --- /dev/null +++ b/tests/test_db_helpers.py @@ -0,0 +1,115 @@ +from datetime import datetime, timezone + +from devplacepy.database import ( + get_table, + get_users_by_uids, + get_vote_counts, + get_user_votes, + get_comment_counts_by_post_uids, + resolve_by_slug, + get_user_stars, + get_user_rank, + build_pagination, +) +from devplacepy.utils import generate_uid + + +def _now(): + return datetime.now(timezone.utc).isoformat() + + +def _user(): + uid = generate_uid() + get_table("users").insert({ + "uid": uid, "username": f"dbh_{uid[:8]}", "email": f"{uid[:8]}@t.dev", + "created_at": _now(), + }) + return uid + + +def _post(owner, **extra): + uid = generate_uid() + get_table("posts").insert({ + "uid": uid, "user_uid": owner, "slug": f"{uid[:8]}-post", + "title": None, "content": "db helper post body", "topic": "random", + "project_uid": None, "image": None, "stars": 0, "created_at": _now(), + **extra, + }) + return uid + + +def test_get_users_by_uids_dedupes(local_db): + a, b = generate_uid(), generate_uid() + users = get_table("users") + users.insert({"uid": a, "username": f"u_{a[:6]}", "created_at": _now()}) + users.insert({"uid": b, "username": f"u_{b[:6]}", "created_at": _now()}) + result = get_users_by_uids([a, b, a]) + assert set(result.keys()) == {a, b} + + +def test_get_vote_counts_groups_up_and_down(local_db): + target = generate_uid() + votes = get_table("votes") + for value in (1, 1, -1): + votes.insert({ + "uid": generate_uid(), "user_uid": generate_uid(), + "target_uid": target, "target_type": "post", "value": value, + "created_at": _now(), + }) + ups, downs = get_vote_counts([target]) + assert ups[target] == 2 + assert downs[target] == 1 + + +def test_get_user_votes_returns_user_value(local_db): + target = generate_uid() + user = generate_uid() + get_table("votes").insert({ + "uid": generate_uid(), "user_uid": user, + "target_uid": target, "target_type": "post", "value": 1, "created_at": _now(), + }) + assert get_user_votes(user, [target]) == {target: 1} + + +def test_get_comment_counts_by_post_uids(local_db): + post = generate_uid() + comments = get_table("comments") + for _ in range(2): + comments.insert({ + "uid": generate_uid(), "user_uid": generate_uid(), + "target_type": "post", "target_uid": post, "content": "hi", "created_at": _now(), + }) + assert get_comment_counts_by_post_uids([post]) == {post: 2} + + +def test_resolve_by_slug_finds_by_slug_or_uid(local_db): + owner = _user() + uid = _post(owner) + posts = get_table("posts") + slug = posts.find_one(uid=uid)["slug"] + assert resolve_by_slug(posts, slug)["uid"] == uid + assert resolve_by_slug(posts, uid)["uid"] == uid + assert resolve_by_slug(posts, "missing-slug-xyz") is None + + +def test_get_user_stars_sums_content_stars(local_db): + user = _user() + _post(user, stars=7) + assert get_user_stars(user) == 7 + + +def test_get_user_rank_unknown_user_is_none(local_db): + assert get_user_rank(generate_uid()) is None + + +def test_build_pagination_metadata(): + first = build_pagination(1, 100, 25) + assert first["total_pages"] == 4 + assert first["has_prev"] is False + assert first["has_next"] is True + middle = build_pagination(2, 100, 25) + assert middle["has_prev"] is True + assert middle["has_next"] is True + clamped = build_pagination(99, 10, 25) + assert clamped["page"] == 1 + assert clamped["total_pages"] == 1 diff --git a/tests/test_follow_api.py b/tests/test_follow_api.py new file mode 100644 index 0000000..58ecd34 --- /dev/null +++ b/tests/test_follow_api.py @@ -0,0 +1,75 @@ +import time + +import requests + +from tests.conftest import BASE_URL +from devplacepy.database import get_table + +_counter = [0] + + +def _session(): + _counter[0] += 1 + name = f"flw{int(time.time() * 1000)}{_counter[0]}" + s = requests.Session() + s.post(f"{BASE_URL}/auth/signup", data={ + "username": name, "email": f"{name}@t.dev", + "password": "secret123", "confirm_password": "secret123", + }, allow_redirects=True) + return s, name + + +def _uid(username): + return get_table("users").find_one(username=username)["uid"] + + +def test_follow_creates_notification_and_awards_xp(app_server): + s_a, a_name = _session() + _, b_name = _session() + a_uid, b_uid = _uid(a_name), _uid(b_name) + before = get_table("users").find_one(uid=b_uid).get("xp", 0) or 0 + + s_a.post(f"{BASE_URL}/follow/{b_name}", allow_redirects=False) + + assert get_table("follows").count(follower_uid=a_uid, following_uid=b_uid) == 1 + assert get_table("notifications").count(user_uid=b_uid, type="follow") == 1 + after = get_table("users").find_one(uid=b_uid).get("xp", 0) or 0 + assert after - before == 5 + + +def test_duplicate_follow_is_idempotent(app_server): + s_a, a_name = _session() + _, b_name = _session() + a_uid, b_uid = _uid(a_name), _uid(b_name) + + s_a.post(f"{BASE_URL}/follow/{b_name}", allow_redirects=False) + s_a.post(f"{BASE_URL}/follow/{b_name}", allow_redirects=False) + + assert get_table("follows").count(follower_uid=a_uid, following_uid=b_uid) == 1 + assert get_table("notifications").count(user_uid=b_uid, type="follow") == 1 + + +def test_self_follow_rejected(app_server): + s_a, a_name = _session() + a_uid = _uid(a_name) + s_a.post(f"{BASE_URL}/follow/{a_name}", allow_redirects=False) + assert get_table("follows").count(follower_uid=a_uid, following_uid=a_uid) == 0 + + +def test_follow_nonexistent_user_redirects(app_server): + s_a, _ = _session() + r = s_a.post(f"{BASE_URL}/follow/no_such_user_zzz", allow_redirects=False) + assert r.status_code == 302 + + +def test_unfollow_removes_follow(app_server): + s_a, a_name = _session() + _, b_name = _session() + a_uid, b_uid = _uid(a_name), _uid(b_name) + + s_a.post(f"{BASE_URL}/follow/{b_name}", allow_redirects=False) + s_a.post(f"{BASE_URL}/follow/unfollow/{b_name}", allow_redirects=False) + assert get_table("follows").count(follower_uid=a_uid, following_uid=b_uid) == 0 + + r = s_a.post(f"{BASE_URL}/follow/unfollow/{b_name}", allow_redirects=False) + assert r.status_code == 302 diff --git a/tests/test_news_service.py b/tests/test_news_service.py new file mode 100644 index 0000000..3ce45f2 --- /dev/null +++ b/tests/test_news_service.py @@ -0,0 +1,219 @@ +import asyncio + +import httpx + +from devplacepy.services import news as news_mod +from devplacepy.services.news import NewsService, _extract_grade, _get_ai_key, _get_article_images +from devplacepy.database import get_table +from devplacepy.utils import generate_uid + +API_URL = "http://news.test/api" +AI_URL = "http://ai.test/v1/chat" +LINK_HIGH = "http://news.test/high" +LINK_LOW = "http://news.test/low" + + +class FakeResp: + def __init__(self, json_data=None, text="", status=200): + self._json = json_data + self.text = text + self.status_code = status + + def raise_for_status(self): + if self.status_code >= 400: + raise httpx.HTTPError(f"status {self.status_code}") + + def json(self): + return self._json + + +class FakeClient: + def __init__(self, articles): + self.articles = articles + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + async def get(self, url, timeout=None): + if url == API_URL: + return FakeResp(json_data={"articles": self.articles}) + return FakeResp(text='
')
+
+ async def post(self, url, json=None, headers=None, timeout=None):
+ prompt = json["messages"][0]["content"]
+ if "FailArticle" in prompt:
+ return FakeResp(status=500, text="err")
+ if "EmptyArticle" in prompt:
+ return FakeResp(json_data={"choices": [{"message": {"content": ""}}]})
+ if "BadArticle" in prompt:
+ return FakeResp(json_data={"choices": [{"message": {"content": "no number"}}]})
+ grade = "9" if "HighArticle" in prompt else "3"
+ return FakeResp(json_data={"choices": [{"message": {"content": grade}}]})
+
+
+class FailingApiClient(FakeClient):
+ async def get(self, url, timeout=None):
+ if url == API_URL:
+ raise httpx.HTTPError("api down")
+ return FakeResp(text="")
+
+
+def _settings_stub(threshold="7"):
+ def fake_get_setting(key, default=None):
+ return {
+ "news_api_url": API_URL,
+ "news_ai_url": AI_URL,
+ "news_ai_model": "test-model",
+ "news_grade_threshold": threshold,
+ "news_ai_key": "",
+ }.get(key, default)
+ return fake_get_setting
+
+
+def test_extract_grade_parsing():
+ assert _extract_grade("8") == 8
+ assert _extract_grade("Grade: 9") == 9
+ assert _extract_grade("Score is 7/10") == 7
+ assert _extract_grade("0") is None
+ assert _extract_grade("11") is None
+ assert _extract_grade("not a number") is None
+
+
+def test_get_ai_key_env_precedence(monkeypatch):
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ monkeypatch.setenv("NEWS_AI_KEY", "primary")
+ assert _get_ai_key() == "primary"
+
+
+def test_get_ai_key_setting_fallback(monkeypatch):
+ monkeypatch.delenv("NEWS_AI_KEY", raising=False)
+ monkeypatch.setattr(news_mod, "get_setting", lambda key, default=None: "from-setting" if key == "news_ai_key" else default)
+ assert _get_ai_key() == "from-setting"
+
+
+def test_get_ai_key_openrouter_fallback(monkeypatch):
+ monkeypatch.delenv("NEWS_AI_KEY", raising=False)
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ monkeypatch.setenv("OPENROUTER_API_KEY", "router-key")
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ assert _get_ai_key() == "router-key"
+
+
+def test_get_ai_key_openai_fallback(monkeypatch):
+ monkeypatch.delenv("NEWS_AI_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ monkeypatch.setenv("OPENAI_API_KEY", "openai-key")
+ assert _get_ai_key() == "openai-key"
+
+
+def test_get_ai_key_empty_when_unset(monkeypatch):
+ monkeypatch.delenv("NEWS_AI_KEY", raising=False)
+ monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
+ monkeypatch.delenv("OPENAI_API_KEY", raising=False)
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ assert _get_ai_key() == ""
+
+
+def test_grade_article_empty_content_returns_none(local_db, monkeypatch):
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ grade = asyncio.run(NewsService()._grade_article(
+ {"title": "EmptyArticle", "description": "d", "content": "c"}, AI_URL, "m", FakeClient([])))
+ assert grade is None
+
+
+def test_grade_article_unparseable_returns_none(local_db, monkeypatch):
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ grade = asyncio.run(NewsService()._grade_article(
+ {"title": "BadArticle", "description": "d", "content": "c"}, AI_URL, "m", FakeClient([])))
+ assert grade is None
+
+
+def test_run_once_handles_api_failure(local_db, monkeypatch):
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub())
+ monkeypatch.setattr(news_mod.httpx, "AsyncClient", lambda *a, **k: FailingApiClient([]))
+ asyncio.run(NewsService().run_once())
+
+
+def test_run_once_updates_existing_news_row(local_db, monkeypatch):
+ external_id = f"news-{generate_uid()}"
+ existing_uid = generate_uid()
+ get_table("news").insert({
+ "uid": existing_uid, "external_id": external_id, "slug": "",
+ "title": "Old Title", "status": "draft", "grade": 0, "synced_at": "2020-01-01",
+ })
+ articles = [{"guid": external_id, "title": "HighArticle", "description": "d", "content": "c",
+ "link": "", "feed_name": "Feed", "author": "A", "published": "2026-01-01"}]
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub(threshold="7"))
+ monkeypatch.setattr(news_mod.httpx, "AsyncClient", lambda *a, **k: FakeClient(articles))
+
+ asyncio.run(NewsService().run_once())
+
+ row = get_table("news").find_one(uid=existing_uid)
+ assert row["title"] == "HighArticle"
+ assert row["status"] == "published"
+ assert get_table("news").count(external_id=external_id) == 1
+
+
+def test_get_article_images_filters_and_dedupes():
+ html = (
+ '
'
+ "
"
+ '
'
+ )
+ client = FakeClient([])
+
+ async def fake_get(url, timeout=None):
+ return FakeResp(text=html)
+
+ client.get = fake_get
+ images = asyncio.run(_get_article_images("http://news.test/page", client))
+ assert [img["url"] for img in images] == ["http://img.test/a.png"]
+
+
+def test_get_article_images_network_error_returns_empty():
+ client = FakeClient([])
+
+ async def failing_get(url, timeout=None):
+ raise httpx.HTTPError("down")
+
+ client.get = failing_get
+ assert asyncio.run(_get_article_images("http://news.test/page", client)) == []
+
+
+def test_run_once_publishes_grades_and_is_idempotent(local_db, monkeypatch):
+ g_high, g_low, g_fail = (f"news-{generate_uid()}" for _ in range(3))
+ articles = [
+ {"guid": g_high, "title": "HighArticle", "description": "d", "content": "c",
+ "link": LINK_HIGH, "feed_name": "Feed", "author": "A", "published": "2026-01-01"},
+ {"guid": g_low, "title": "LowArticle", "description": "d", "content": "c",
+ "link": LINK_LOW, "feed_name": "Feed", "author": "A", "published": "2026-01-01"},
+ {"guid": g_fail, "title": "FailArticle", "description": "d", "content": "c",
+ "link": "", "feed_name": "Feed", "author": "A", "published": "2026-01-01"},
+ {"guid": "", "title": "NoGuid", "description": "d", "content": "c",
+ "link": "", "feed_name": "Feed", "author": "A", "published": "2026-01-01"},
+ ]
+ monkeypatch.setattr(news_mod, "get_setting", _settings_stub(threshold="7"))
+ monkeypatch.setattr(news_mod.httpx, "AsyncClient", lambda *a, **k: FakeClient(articles))
+
+ asyncio.run(NewsService().run_once())
+
+ news = get_table("news")
+ sync = get_table("news_sync")
+ assert news.find_one(external_id=g_high)["status"] == "published"
+ assert news.find_one(external_id=g_high)["grade"] == 9
+ assert news.find_one(external_id=g_low)["status"] == "draft"
+ assert news.find_one(external_id=g_low)["grade"] == 3
+ fail_row = news.find_one(external_id=g_fail)
+ assert fail_row["status"] == "draft"
+ assert fail_row["grade"] == 0
+ assert sync.find_one(external_id=g_fail)["status"] == "grading_failed"
+ assert sync.find_one(external_id=g_high)["status"] == "graded"
+
+ asyncio.run(NewsService().run_once())
+ assert news.count(external_id=g_high) == 1
+ assert news.count(external_id=g_fail) == 1
diff --git a/tests/test_votes.py b/tests/test_votes.py
new file mode 100644
index 0000000..ef42010
--- /dev/null
+++ b/tests/test_votes.py
@@ -0,0 +1,101 @@
+import time
+from datetime import datetime, timezone
+
+import requests
+
+from tests.conftest import BASE_URL
+from devplacepy.database import get_table
+from devplacepy.utils import generate_uid
+
+_counter = [0]
+AJAX = {"X-Requested-With": "fetch"}
+
+
+def _session():
+ _counter[0] += 1
+ name = f"vot{int(time.time() * 1000)}{_counter[0]}"
+ s = requests.Session()
+ s.post(f"{BASE_URL}/auth/signup", data={
+ "username": name, "email": f"{name}@t.dev",
+ "password": "secret123", "confirm_password": "secret123",
+ }, allow_redirects=True)
+ return s, name
+
+
+def _uid(username):
+ return get_table("users").find_one(username=username)["uid"]
+
+
+def _make_post(owner_uid):
+ uid = generate_uid()
+ get_table("posts").insert({
+ "uid": uid, "user_uid": owner_uid, "slug": f"{uid[:8]}-vote-post",
+ "title": None, "content": "vote target content", "topic": "random",
+ "project_uid": None, "image": None, "stars": 0,
+ "created_at": datetime.now(timezone.utc).isoformat(),
+ })
+ return uid
+
+
+def test_upvote_returns_ajax_payload(app_server):
+ s_a, a_name = _session()
+ s_b, _ = _session()
+ post_uid = _make_post(_uid(a_name))
+ r = s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ assert r.json() == {"net": 1, "up": 1, "down": 0, "value": 1}
+
+
+def test_repeated_vote_toggles_off(app_server):
+ s_a, a_name = _session()
+ s_b, _ = _session()
+ post_uid = _make_post(_uid(a_name))
+ s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ r = s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ assert r.json()["net"] == 0
+ assert r.json()["value"] == 0
+
+
+def test_switch_upvote_to_downvote(app_server):
+ s_a, a_name = _session()
+ s_b, _ = _session()
+ post_uid = _make_post(_uid(a_name))
+ s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ r = s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "-1"}, headers=AJAX)
+ assert r.json()["net"] == -1
+ assert r.json()["value"] == -1
+
+
+def test_upvote_notifies_owner_and_awards_xp(app_server):
+ s_a, a_name = _session()
+ s_b, _ = _session()
+ a_uid = _uid(a_name)
+ post_uid = _make_post(a_uid)
+ before = get_table("users").find_one(uid=a_uid).get("xp", 0) or 0
+ s_b.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ assert get_table("notifications").count(user_uid=a_uid, type="vote") == 1
+ after = get_table("users").find_one(uid=a_uid).get("xp", 0) or 0
+ assert after - before == 5
+
+
+def test_self_vote_does_not_notify(app_server):
+ s_a, a_name = _session()
+ a_uid = _uid(a_name)
+ post_uid = _make_post(a_uid)
+ before = get_table("notifications").count(user_uid=a_uid, type="vote")
+ s_a.post(f"{BASE_URL}/votes/post/{post_uid}", data={"value": "1"}, headers=AJAX)
+ after = get_table("notifications").count(user_uid=a_uid, type="vote")
+ assert after == before
+
+
+def test_non_ajax_vote_redirects_to_referer(app_server):
+ s_a, a_name = _session()
+ s_b, _ = _session()
+ post_uid = _make_post(_uid(a_name))
+ r = s_b.post(
+ f"{BASE_URL}/votes/post/{post_uid}",
+ data={"value": "1"},
+ headers={"Referer": f"{BASE_URL}/feed"},
+ allow_redirects=False,
+ )
+ assert r.status_code == 302
+ assert r.headers["location"] == f"{BASE_URL}/feed"