import logging import os import random import re import uuid from locust import HttpUser, task, between, events logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("locust") SEED_USERS = [] ALL_USERNAMES = [] USER_UIDS = [] POST_UIDS = [] POST_SLUGS = [] PROJECT_UIDS = [] PROJECT_SLUGS = [] COMMENT_UIDS = [] NEWS_UIDS = [] NEWS_SLUGS = [] GIST_SLUGS = [] GIST_UIDS = [] NOTIFICATION_UIDS = [] ADMIN_USER = {} ADMIN_TARGETS = {} TOPICS = ["devlog", "showcase", "question", "rant", "fun", "random"] PROJECT_TYPES = ["game", "game_asset", "software", "mobile_app", "website"] GIST_LANGUAGES = ["python", "javascript", "go", "rust", "bash", "sql", "json"] UPLOAD_FILE = ("load_test.txt", b"locust upload payload", "text/plain") AVATAR_STYLES = [ "adventurer", "adventurer-neutral", "avataaars", "bottts", "identicon", "initials", "lorelei", "micah", "open-peeps", "pixel-art", "shapes", ] UUID_RE = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}' PASSWORD = "testpass123" def random_username(): return f"lu_{uuid.uuid4().hex[:10]}" def do_signup(client, username, email): with client.post("/auth/signup", data={ "username": username, "email": email, "password": PASSWORD, "confirm_password": PASSWORD, }, catch_response=True, name="signup") as resp: if resp.status_code == 200 and resp.history: resp.success() return True resp.failure(f"signup failed: status={resp.status_code}") return False def do_login(client, email): with client.post("/auth/login", data={ "email": email, "password": PASSWORD, }, catch_response=True, name="login") as resp: if resp.status_code == 200 and resp.history: resp.success() return True resp.failure(f"login failed: status={resp.status_code}") return False @events.init.add_listener def seed_data(environment, **kwargs): host = environment.host or "http://127.0.0.1:10502" logger.info(f"Seeding {host}...") import urllib.request import urllib.parse import http.cookiejar created = 0 while created < 5: username = random_username() email = f"{username}@locust.devplace" body = urllib.parse.urlencode({ "username": username, "email": email, "password": PASSWORD, "confirm_password": PASSWORD, }).encode() req = urllib.request.Request(f"{host}/auth/signup", data=body) try: urllib.request.urlopen(req, timeout=10) SEED_USERS.append({"username": username, "email": email}) ALL_USERNAMES.append(username) created += 1 except Exception as e: logger.warning(f"Create seed user failed: {e}") logger.info(f"Created {len(SEED_USERS)} seed users") def logged_in_opener(email): jar = http.cookiejar.CookieJar() opener = urllib.request.build_opener( urllib.request.HTTPCookieProcessor(jar) ) body = urllib.parse.urlencode({"email": email, "password": PASSWORD}).encode() opener.open( urllib.request.Request(f"{host}/auth/login", data=body), timeout=10 ) return opener for seed in SEED_USERS[:3]: try: opener = logged_in_opener(seed["email"]) body = urllib.parse.urlencode({ "content": "x " * 100, "title": f"Seed post {uuid.uuid4().hex[:6]}", "topic": random.choice(TOPICS), }).encode() resp = opener.open( urllib.request.Request(f"{host}/posts/create", data=body), timeout=10, ) m = re.search(r"/posts/(\S+)", resp.url) if m and m.group(1) not in POST_SLUGS: POST_SLUGS.append(m.group(1)) html = resp.read().decode("utf-8", errors="replace") m2 = re.search(rf'/votes/post/({UUID_RE})', html) if m2 and m2.group(1) not in POST_UIDS: POST_UIDS.append(m2.group(1)) except Exception as e: logger.warning(f"Create seed post failed: {e}") logger.info(f"Created {len(POST_UIDS)} seed posts") for seed in SEED_USERS[:2]: try: opener = logged_in_opener(seed["email"]) body = urllib.parse.urlencode({ "content": f"Seed comment by {seed['username']}", "post_uid": POST_UIDS[0], }).encode() opener.open( urllib.request.Request(f"{host}/comments/create", data=body), timeout=10, ) except Exception as e: logger.warning(f"Create seed comment failed: {e}") for seed in SEED_USERS[:2]: try: opener = logged_in_opener(seed["email"]) body = urllib.parse.urlencode({ "title": f"Project {uuid.uuid4().hex[:6]}", "description": "z " * 50, "project_type": random.choice(PROJECT_TYPES), }).encode() opener.open( urllib.request.Request(f"{host}/projects/create", data=body), timeout=10, ) except Exception as e: logger.warning(f"Create seed project failed: {e}") for seed in SEED_USERS[:2]: try: opener = logged_in_opener(seed["email"]) body = urllib.parse.urlencode({ "title": f"Gist {uuid.uuid4().hex[:6]}", "description": "Seed gist for load testing.", "source_code": "print('hello')\n" * 5, "language": random.choice(GIST_LANGUAGES), }).encode() resp = opener.open( urllib.request.Request(f"{host}/gists/create", data=body), timeout=10, ) m = re.search(r"/gists/(\S+)", resp.url) if m and m.group(1) not in GIST_SLUGS: GIST_SLUGS.append(m.group(1)) html = resp.read().decode("utf-8", errors="replace") m2 = re.search(rf'/votes/gist/({UUID_RE})', html) if m2 and m2.group(1) not in GIST_UIDS: GIST_UIDS.append(m2.group(1)) except Exception as e: logger.warning(f"Create seed gist failed: {e}") logger.info(f"Created {len(GIST_SLUGS)} seed gists") # ── Seed news articles (direct DB insert) ─────────────────── try: from devplacepy.database import get_table, init_db from devplacepy.utils import make_combined_slug init_db() news_table = get_table("news") for i in range(3): title = f"Locust news article {i} {uuid.uuid4().hex[:4]}" article_uid = str(uuid.uuid4()) slug = make_combined_slug(title, article_uid) news_table.insert({ "uid": article_uid, "slug": slug, "title": title, "external_id": f"locust_seed_{i}", "grade": 8, "status": "published", "show_on_landing": 1, "source_name": "Locust", "synced_at": "2026-01-01T00:00:00", "description": "Locust seed article for load testing.", }) NEWS_UIDS.append(article_uid) NEWS_SLUGS.append(slug) admin_news_uid = str(uuid.uuid4()) admin_news_title = f"Admin target news {uuid.uuid4().hex[:4]}" news_table.insert({ "uid": admin_news_uid, "slug": make_combined_slug(admin_news_title, admin_news_uid), "title": admin_news_title, "external_id": f"locust_admin_target_{uuid.uuid4().hex[:6]}", "grade": 5, "status": "published", "show_on_landing": 0, "source_name": "Locust", "synced_at": "2026-01-01T00:00:00", "description": "Dedicated target for admin news mutations.", }) ADMIN_TARGETS["news_uid"] = admin_news_uid except Exception as e: logger.warning(f"Seed news articles failed: {e}") # ── Seed an admin user (direct DB insert) ─────────────────── try: from datetime import datetime, timezone from devplacepy.database import get_table, init_db from devplacepy.utils import hash_password, generate_uid init_db() username = f"lu_admin_{uuid.uuid4().hex[:6]}" email = f"{username}@locust.devplace" get_table("users").insert({ "uid": generate_uid(), "username": username, "email": email, "password_hash": hash_password(PASSWORD), "bio": "", "location": "", "git_link": "", "website": "", "role": "Admin", "is_active": True, "level": 1, "xp": 0, "stars": 0, "created_at": datetime.now(timezone.utc).isoformat(), }) ADMIN_USER["username"] = username ADMIN_USER["email"] = email ALL_USERNAMES.append(username) disposable_uid = generate_uid() disposable_name = f"lu_target_{uuid.uuid4().hex[:6]}" get_table("users").insert({ "uid": disposable_uid, "username": disposable_name, "email": f"{disposable_name}@locust.devplace", "password_hash": hash_password(PASSWORD), "bio": "", "location": "", "git_link": "", "website": "", "role": "Member", "is_active": True, "level": 1, "xp": 0, "stars": 0, "created_at": datetime.now(timezone.utc).isoformat(), }) ADMIN_TARGETS["disposable_uid"] = disposable_uid logger.info(f"Created admin user {username} and disposable target") except Exception as e: logger.warning(f"Seed admin user failed: {e}") # ── Harvest UIDs from HTML responses ────────────────────── # projects sidebar: /projects?user_uid={uuid} # project cards: /votes/project/{uuid} # comment deletion: /comments/delete/{uuid} uuid_pat = rf'({UUID_RE})' for seed in SEED_USERS: try: opener = logged_in_opener(seed["email"]) resp = opener.open( urllib.request.Request(f"{host}/projects"), timeout=10 ) html = resp.read().decode("utf-8", errors="replace") m = re.search(rf'/projects\?user_uid={uuid_pat}', html) if m and m.group(1) not in USER_UIDS: USER_UIDS.append(m.group(1)) proj_matches = re.findall(rf'/votes/project/{uuid_pat}', html) PROJECT_UIDS.extend( p for p in proj_matches if p not in PROJECT_UIDS ) slug_matches = re.findall(r'href="/projects/([^"/?#]+)"', html) PROJECT_SLUGS.extend( s for s in slug_matches if s not in PROJECT_SLUGS and s not in ("create",) ) except Exception as e: logger.warning(f"Harvest from {seed['username']} failed: {e}") if POST_UIDS: try: opener = logged_in_opener(SEED_USERS[0]["email"]) resp = opener.open( urllib.request.Request(f"{host}/posts/{POST_UIDS[0]}"), timeout=10, ) html = resp.read().decode("utf-8", errors="replace") matches = re.findall(rf'/comments/delete/{uuid_pat}', html) COMMENT_UIDS.extend(m for m in matches if m not in COMMENT_UIDS) except Exception as e: logger.warning(f"Harvest comment UIDs failed: {e}") logger.info( f"Seed complete: {len(SEED_USERS)} users, {len(POST_UIDS)} posts, " f"{len(PROJECT_UIDS)} projects ({len(PROJECT_SLUGS)} slugs), " f"{len(GIST_UIDS)} gists, {len(COMMENT_UIDS)} comments, " f"{len(USER_UIDS)} uids" ) class DevPlaceUser(HttpUser): weight = 20 wait_time = between(1, 5) def on_start(self): self.own_post_slugs = [] self.own_gist_slugs = [] self.own_project_slugs = [] if random.random() < 0.3 and SEED_USERS: seed = random.choice(SEED_USERS) do_login(self.client, seed["email"]) self.username = seed["username"] else: username = random_username() email = f"{username}@locust.devplace" do_signup(self.client, username, email) self.username = username ALL_USERNAMES.append(username) # ── browsing ──────────────────────────────────────────────── @task(5) def view_feed(self): tab = random.choice(["all", "trending", "recent", "following"]) topic = random.choice([None] + TOPICS) params = {"tab": tab} if topic: params["topic"] = topic self.client.get("/feed", params=params, name="feed") @task(4) def view_post(self): slugs = POST_SLUGS if POST_SLUGS else POST_UIDS if not slugs: return self.client.get( f"/posts/{random.choice(slugs)}", name="posts/[uid]" ) @task(3) def view_profile(self): if not ALL_USERNAMES: return self.client.get( f"/profile/{random.choice(ALL_USERNAMES)}", name="profile/[username]" ) @task(3) def view_projects(self): self.client.get("/projects", params={ "tab": random.choice(["recent", "released", "popular", "new"]), }, name="projects") @task(1) def view_landing(self): self.client.get("/", name="landing") @task(1) def view_auth_forms(self): self.client.get( random.choice(["/auth/signup", "/auth/login"]), name="auth/form" ) @task(3) def view_news(self): self.client.get("/news", name="news") @task(2) def view_news_detail(self): slugs = NEWS_SLUGS if NEWS_SLUGS else NEWS_UIDS if not slugs: return self.client.get( f"/news/{random.choice(slugs)}", name="news/[slug]" ) @task(1) def view_bugs(self): self.client.get("/bugs", name="bugs") @task(3) def view_project_detail(self): if not PROJECT_SLUGS: return self.client.get( f"/projects/{random.choice(PROJECT_SLUGS)}", name="projects/[slug]" ) @task(2) def view_gists(self): self.client.get("/gists", name="gists") @task(2) def view_gist_detail(self): if not GIST_SLUGS: return self.client.get( f"/gists/{random.choice(GIST_SLUGS)}", name="gists/[slug]" ) @task(1) def search_profiles(self): term = random.choice(ALL_USERNAMES)[:5] if ALL_USERNAMES else "lu" self.client.get( "/profile/search", params={"q": term}, name="profile/search" ) @task(1) def search_messages(self): term = random.choice(ALL_USERNAMES)[:5] if ALL_USERNAMES else "lu" self.client.get( "/messages/search", params={"q": term}, name="messages/search" ) @task(1) def view_seo(self): self.client.get( random.choice(["/robots.txt", "/sitemap.xml"]), name="seo" ) @task(1) def view_static(self): self.client.get("/static/css/base.css", name="static") # ── auth flows ────────────────────────────────────────────── @task(1) def login_flow(self): if not SEED_USERS: return do_login(self.client, random.choice(SEED_USERS)["email"]) @task(1) def logout_and_relogin(self): self.client.get("/auth/logout", name="auth/logout") self.client.get("/", name="landing") if SEED_USERS: do_login(self.client, random.choice(SEED_USERS)["email"]) @task(1) def view_forgot_password(self): self.client.get("/auth/forgot-password", name="auth/forgot-password") @task(1) def request_password_reset(self): email = random.choice(SEED_USERS)["email"] if SEED_USERS else "x@locust.devplace" self.client.post( "/auth/forgot-password", data={"email": email}, name="auth/forgot-password/post", ) @task(1) def reset_password_flow(self): token = uuid.uuid4().hex + uuid.uuid4().hex self.client.get( f"/auth/reset-password/{token}", name="auth/reset-password" ) self.client.post( f"/auth/reset-password/{token}", data={"password": PASSWORD, "confirm_password": PASSWORD}, name="auth/reset-password/post", ) # ── content creation ──────────────────────────────────────── @task(2) def create_post(self): data = { "content": "x " * 100, "title": f"Load test {uuid.uuid4().hex[:6]}", "topic": random.choice(TOPICS), } with self.client.post("/posts/create", data=data, catch_response=True, name="posts/create") as resp: if resp.status_code == 200 and resp.history: m = re.search(r"/posts/(\S+)", resp.url) if m: slug = m.group(1) if slug not in POST_SLUGS: POST_SLUGS.append(slug) self.own_post_slugs.append(slug) m2 = re.search(rf'/votes/post/({UUID_RE})', resp.text) if m2 and m2.group(1) not in POST_UIDS: POST_UIDS.append(m2.group(1)) resp.success() else: resp.failure("post not created") @task(2) def comment_on_post(self): if not POST_UIDS: return self.client.post("/comments/create", data={ "content": f"Comment {uuid.uuid4().hex[:6]} " * 3, "post_uid": random.choice(POST_UIDS), }, name="comments/create") @task(1) def create_project(self): with self.client.post("/projects/create", data={ "title": f"Project {uuid.uuid4().hex[:6]}", "description": "y " * 50, "project_type": random.choice(PROJECT_TYPES), }, catch_response=True, name="projects/create") as resp: m = re.search(r"/projects/(\S+)", resp.url) if resp.status_code == 200 and m: slug = m.group(1) if slug not in PROJECT_SLUGS: PROJECT_SLUGS.append(slug) self.own_project_slugs.append(slug) resp.success() else: resp.failure("project not created") @task(1) def create_gist(self): with self.client.post("/gists/create", data={ "title": f"Gist {uuid.uuid4().hex[:6]}", "description": "Load test gist.", "source_code": "x = 1\n" * 20, "language": random.choice(GIST_LANGUAGES), }, catch_response=True, name="gists/create") as resp: m = re.search(r"/gists/(\S+)", resp.url) if resp.status_code == 200 and m: slug = m.group(1) if slug not in GIST_SLUGS: GIST_SLUGS.append(slug) self.own_gist_slugs.append(slug) m2 = re.search(rf'/votes/gist/({UUID_RE})', resp.text) if m2 and m2.group(1) not in GIST_UIDS: GIST_UIDS.append(m2.group(1)) resp.success() else: resp.failure("gist not created") @task(1) def create_bug(self): self.client.post("/bugs/create", data={ "title": f"Bug {uuid.uuid4().hex[:6]}", "description": "Load test bug report description.", }, name="bugs/create") # ── social interaction ────────────────────────────────────── @task(2) def vote_on_post(self): if not POST_UIDS: return self.client.post( f"/votes/post/{random.choice(POST_UIDS)}", data={"value": random.choice([1, -1])}, name="votes/post", ) @task(1) def vote_on_project(self): if not PROJECT_UIDS: return self.client.post( f"/votes/project/{random.choice(PROJECT_UIDS)}", data={"value": random.choice([1, -1])}, name="votes/project", ) @task(1) def vote_on_comment(self): if not COMMENT_UIDS: return self.client.post( f"/votes/comment/{random.choice(COMMENT_UIDS)}", data={"value": random.choice([1, -1])}, name="votes/comment", ) @task(1) def vote_on_gist(self): if not GIST_UIDS: return self.client.post( f"/votes/gist/{random.choice(GIST_UIDS)}", data={"value": random.choice([1, -1])}, name="votes/gist", ) @task(1) def follow_user(self): targets = [u for u in ALL_USERNAMES if u != self.username] if not targets: return self.client.post( f"/follow/{random.choice(targets)}", name="follow/[username]" ) @task(1) def unfollow_user(self): targets = [u for u in ALL_USERNAMES if u != self.username] if not targets: return self.client.post( f"/follow/unfollow/{random.choice(targets)}", name="follow/unfollow" ) @task(1) def send_message(self): if not USER_UIDS or not SEED_USERS: return target_uid = random.choice(USER_UIDS) self.client.post("/messages/send", data={ "content": f"Msg from {self.username}", "receiver_uid": target_uid, }, name="messages/send") # ── user self-management ──────────────────────────────────── @task(1) def update_profile(self): self.client.post("/profile/update", data={ "bio": f"Updated at {uuid.uuid4().hex[:6]}", "location": random.choice(["NL", "US", "DE", "UK", " "]), }, name="profile/update") @task(1) def view_messages(self): self.client.get("/messages", name="messages") @task(1) def view_notifications(self): resp = self.client.get("/notifications", name="notifications") matches = re.findall(rf'/notifications/mark-read/({UUID_RE})', resp.text) NOTIFICATION_UIDS.extend( m for m in matches if m not in NOTIFICATION_UIDS ) @task(1) def mark_all_notifications_read(self): self.client.post("/notifications/mark-all-read", name="notifications/mark-all-read") @task(1) def mark_one_notification_read(self): if not NOTIFICATION_UIDS: return self.client.post( f"/notifications/mark-read/{random.choice(NOTIFICATION_UIDS)}", name="notifications/mark-read", ) @task(1) def delete_comment(self): if not COMMENT_UIDS: return self.client.post( f"/comments/delete/{random.choice(COMMENT_UIDS)}", name="comments/delete", ) # ── owner-scoped mutations ────────────────────────────────── @task(1) def edit_post(self): if not self.own_post_slugs: return self.client.post( f"/posts/edit/{random.choice(self.own_post_slugs)}", data={ "content": f"Edited {uuid.uuid4().hex} " * 3, "title": "Edited post", "topic": random.choice(TOPICS), }, name="posts/edit", ) @task(1) def delete_post(self): if not self.own_post_slugs: return slug = self.own_post_slugs.pop() if slug in POST_SLUGS: POST_SLUGS.remove(slug) self.client.post(f"/posts/delete/{slug}", name="posts/delete") @task(1) def edit_gist(self): if not self.own_gist_slugs: return self.client.post( f"/gists/edit/{random.choice(self.own_gist_slugs)}", data={ "title": "Edited gist", "description": "Edited description.", "source_code": "y = 2\n" * 20, "language": random.choice(GIST_LANGUAGES), }, name="gists/edit", ) @task(1) def delete_gist(self): if not self.own_gist_slugs: return slug = self.own_gist_slugs.pop() if slug in GIST_SLUGS: GIST_SLUGS.remove(slug) self.client.post(f"/gists/delete/{slug}", name="gists/delete") @task(1) def delete_project(self): if not self.own_project_slugs: return slug = self.own_project_slugs.pop() if slug in PROJECT_SLUGS: PROJECT_SLUGS.remove(slug) self.client.post(f"/projects/delete/{slug}", name="projects/delete") @task(1) def upload_file(self): name, content, mime = UPLOAD_FILE with self.client.post( "/uploads/upload", files={"file": (name, content, mime)}, catch_response=True, name="uploads/upload", ) as resp: if resp.status_code == 201: resp.success() try: uid = resp.json().get("uid") except ValueError: uid = None if uid: self.client.delete( f"/uploads/delete/{uid}", name="uploads/delete" ) else: resp.failure(f"upload failed: status={resp.status_code}") # ── static / media ────────────────────────────────────────── @task(2) def view_multiavatar(self): seed = random.choice(ALL_USERNAMES) if ALL_USERNAMES else "anon" self.client.get( f"/avatar/multiavatar/{seed}?size=32", name="avatar/multiavatar" ) @task(1) def comment_on_news(self): if not NEWS_UIDS: return self.client.post("/comments/create", data={ "content": f"News comment {uuid.uuid4().hex[:6]} " * 2, "target_uid": random.choice(NEWS_UIDS), "target_type": "news", }, name="comments/news") class AdminUser(HttpUser): weight = 1 wait_time = between(2, 6) def on_start(self): if ADMIN_USER: do_login(self.client, ADMIN_USER["email"]) @task(2) def view_admin_index(self): self.client.get("/admin", name="admin") @task(3) def view_admin_users(self): self.client.get("/admin/users", name="admin/users") @task(2) def view_admin_settings(self): self.client.get("/admin/settings", name="admin/settings") @task(2) def view_admin_news(self): self.client.get("/admin/news", name="admin/news") @task(2) def view_admin_services(self): self.client.get("/admin/services", name="admin/services") @task(2) def view_admin_services_data(self): self.client.get("/admin/services/data", name="admin/services/data") @task(1) def save_settings(self): self.client.post("/admin/settings", data={ "max_upload_size_mb": "10", }, name="admin/settings/save") @task(1) def news_toggle(self): uid = ADMIN_TARGETS.get("news_uid") if not uid: return self.client.post(f"/admin/news/{uid}/toggle", name="admin/news/toggle") @task(1) def news_publish(self): uid = ADMIN_TARGETS.get("news_uid") if not uid: return self.client.post(f"/admin/news/{uid}/publish", name="admin/news/publish") @task(1) def news_landing(self): uid = ADMIN_TARGETS.get("news_uid") if not uid: return self.client.post(f"/admin/news/{uid}/landing", name="admin/news/landing") @task(1) def news_delete(self): uid = ADMIN_TARGETS.get("news_uid") if not uid: return self.client.post(f"/admin/news/{uid}/delete", name="admin/news/delete") @task(1) def user_role(self): uid = ADMIN_TARGETS.get("disposable_uid") if not uid: return self.client.post( f"/admin/users/{uid}/role", data={"role": "Member"}, name="admin/users/role", ) @task(1) def user_password(self): uid = ADMIN_TARGETS.get("disposable_uid") if not uid: return self.client.post( f"/admin/users/{uid}/password", data={"password": PASSWORD}, name="admin/users/password", ) @task(1) def user_toggle(self): uid = ADMIN_TARGETS.get("disposable_uid") if not uid: return self.client.post( f"/admin/users/{uid}/toggle", name="admin/users/toggle" )