Update
All checks were successful
DevPlace CI / test (push) Successful in 5m51s

This commit is contained in:
retoor 2026-05-23 05:44:04 +02:00
parent 4c30a32eb2
commit cb49f0cee1
23 changed files with 612 additions and 291 deletions

View File

@ -6,6 +6,7 @@ from collections import defaultdict
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.exceptions import RequestValidationError
from devplacepy.config import STATIC_DIR, PORT
from devplacepy.database import init_db, get_table, db, get_users_by_uids, get_comment_counts_by_post_uids, get_vote_counts
from devplacepy.templating import templates
@ -49,6 +50,51 @@ async def server_error(request: Request, exc):
seo_ctx = base_seo_context(request, title="Server Error - DevPlace", description="Something went wrong.", robots="noindex")
return templates.TemplateResponse(request, "error.html", {**seo_ctx, "request": request, "error_code": 500, "error_message": "Internal server error"}, status_code=500)
_AUTH_FORM_PAGES = {
"/auth/signup": ("signup.html", "Join DevPlace"),
"/auth/login": ("login.html", "Sign In"),
"/auth/forgot-password": ("forgot_password.html", "Reset Password"),
}
_FRIENDLY_ERRORS = {
("username", "too_short"): "Username must be between 3 and 32 characters",
("username", "too_long"): "Username must be between 3 and 32 characters",
("password", "too_short"): "Password must be at least 6 characters",
}
def _friendly_error(err):
field = err["loc"][-1] if err.get("loc") else ""
key = (field, err.get("type", "").replace("string_", ""))
if key in _FRIENDLY_ERRORS:
return _FRIENDLY_ERRORS[key]
msg = err.get("msg", "Invalid input")
prefix = "Value error, "
return msg[len(prefix):] if msg.startswith(prefix) else msg
@app.exception_handler(RequestValidationError)
async def on_validation_error(request: Request, exc: RequestValidationError):
errors = [_friendly_error(e) for e in exc.errors()]
path = request.url.path
page = _AUTH_FORM_PAGES.get(path)
if page is None and path.startswith("/auth/reset-password/"):
page = ("reset_password.html", "Set New Password")
if page:
template_name, title = page
context = {**base_seo_context(request, title=title, robots="noindex,nofollow"), "request": request, "errors": errors}
try:
form = await request.form()
context.update({k: v for k, v in form.items() if isinstance(v, str)})
except Exception:
pass
if "token" in request.path_params:
context["token"] = request.path_params["token"]
return templates.TemplateResponse(request, template_name, context, status_code=400)
referer = request.headers.get("referer") or "/feed"
return RedirectResponse(url=referer, status_code=303)
app.include_router(auth.router, prefix="/auth")
app.include_router(feed.router, prefix="/feed")
app.include_router(posts.router, prefix="/posts")

View File

@ -1,49 +1,161 @@
from pydantic import BaseModel, Field
from typing import Optional
from datetime import date
from typing import Optional, Literal
from pydantic import BaseModel, Field, field_validator, model_validator
from devplacepy.constants import TOPICS
class SignupRequest(BaseModel):
username: str = Field(min_length=3, max_length=32, pattern=r"^[a-zA-Z0-9_-]+$")
email: str = Field(min_length=5, max_length=255)
class SignupForm(BaseModel):
username: str = Field(min_length=3, max_length=32)
email: str = Field(min_length=1, max_length=255)
password: str = Field(min_length=6, max_length=128)
confirm_password: str = Field(min_length=6, max_length=128)
confirm_password: str = Field(min_length=1, max_length=128)
@field_validator("username")
@classmethod
def username_chars(cls, value):
if not value.isascii() or not all(c.isalnum() or c in ("-", "_") for c in value):
raise ValueError("Username can only contain letters, numbers, hyphens, and underscores")
return value
@field_validator("email")
@classmethod
def email_has_at(cls, value):
if "@" not in value:
raise ValueError("Valid email is required")
return value
@model_validator(mode="after")
def passwords_match(self):
if self.password != self.confirm_password:
raise ValueError("Passwords do not match")
return self
class LoginRequest(BaseModel):
email: str = Field(min_length=5, max_length=255)
class LoginForm(BaseModel):
email: str = Field(min_length=1, max_length=255)
password: str = Field(min_length=1, max_length=128)
remember_me: str = ""
class ForgotPasswordForm(BaseModel):
email: str = Field(min_length=1, max_length=255)
@field_validator("email")
@classmethod
def email_has_at(cls, value):
if "@" not in value:
raise ValueError("Valid email is required")
return value
class ResetPasswordForm(BaseModel):
password: str = Field(min_length=6, max_length=128)
remember_me: bool = False
confirm_password: str = Field(min_length=1, max_length=128)
@model_validator(mode="after")
def passwords_match(self):
if self.password != self.confirm_password:
raise ValueError("Passwords do not match")
return self
class PostCreate(BaseModel):
content: str = Field(min_length=1, max_length=2000)
title: Optional[str] = Field(default=None, max_length=500)
topic: str = Field(pattern=r"^(devlog|showcase|question|rant|fun|random)$")
project_uid: Optional[str] = None
class PostForm(BaseModel):
content: str = Field(min_length=10, max_length=2000)
title: str = Field(default="", max_length=500)
topic: str = "random"
project_uid: str = ""
attachment_uids: list[str] = []
@field_validator("topic")
@classmethod
def valid_topic(cls, value):
return value if value in TOPICS else "random"
class CommentCreate(BaseModel):
content: str = Field(min_length=1, max_length=1000)
parent_uid: Optional[str] = None
class PostEditForm(BaseModel):
content: str = Field(min_length=10, max_length=2000)
title: str = Field(default="", max_length=500)
topic: str = "random"
@field_validator("topic")
@classmethod
def valid_topic(cls, value):
return value if value in TOPICS else "random"
class ProjectCreate(BaseModel):
class CommentForm(BaseModel):
content: str = Field(min_length=3, max_length=1000)
target_uid: str = ""
post_uid: str = ""
target_type: Literal["post", "project", "news", "bug", "gist"] = "post"
parent_uid: str = ""
attachment_uids: list[str] = []
@model_validator(mode="after")
def require_target(self):
if not (self.target_uid or self.post_uid):
raise ValueError("A target is required")
return self
class ProjectForm(BaseModel):
title: str = Field(min_length=1, max_length=200)
description: str = Field(min_length=1, max_length=5000)
release_date: Optional[date] = None
demo_date: Optional[date] = None
project_type: str = Field(pattern=r"^(game|game_asset|software|mobile_app|website)$")
release_date: str = ""
demo_date: str = ""
project_type: Literal["game", "game_asset", "software", "mobile_app", "website"] = "software"
platforms: str = Field(default="", max_length=500)
status: str = Field(default="In Development", max_length=100)
attachment_uids: list[str] = []
class MessageCreate(BaseModel):
class MessageForm(BaseModel):
content: str = Field(min_length=1, max_length=2000)
receiver_uid: str
receiver_uid: str = Field(min_length=1)
attachment_uids: list[str] = []
class ProfileUpdate(BaseModel):
bio: Optional[str] = Field(default=None, max_length=500)
location: Optional[str] = Field(default=None, max_length=200)
git_link: Optional[str] = Field(default=None, max_length=500)
website: Optional[str] = Field(default=None, max_length=500)
class ProfileForm(BaseModel):
bio: str = Field(default="", max_length=500)
location: str = Field(default="", max_length=200)
git_link: str = Field(default="", max_length=500)
website: str = Field(default="", max_length=500)
class GistForm(BaseModel):
title: str = Field(min_length=1, max_length=200)
description: str = Field(default="", max_length=5000)
source_code: str = Field(min_length=1, max_length=50000)
language: str = Field(default="plaintext", max_length=50)
attachment_uids: list[str] = []
class GistEditForm(BaseModel):
title: str = Field(min_length=1, max_length=200)
description: str = Field(default="", max_length=5000)
source_code: str = Field(min_length=1, max_length=50000)
language: str = Field(default="plaintext", max_length=50)
class BugForm(BaseModel):
title: str = Field(min_length=1, max_length=200)
description: str = Field(min_length=1, max_length=5000)
attachment_uids: list[str] = []
class VoteForm(BaseModel):
value: int
@field_validator("value")
@classmethod
def valid_value(cls, value):
if value not in (1, -1):
raise ValueError("value must be 1 or -1")
return value
class AdminRoleForm(BaseModel):
role: Literal["member", "admin"]
class AdminPasswordForm(BaseModel):
password: str = Field(min_length=6, max_length=128)

View File

@ -1,6 +1,8 @@
import logging
from typing import Annotated
from datetime import datetime
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from devplacepy.models import AdminRoleForm, AdminPasswordForm
from fastapi.responses import HTMLResponse, RedirectResponse
from devplacepy.database import get_table, db, build_pagination
from devplacepy.templating import templates
@ -51,12 +53,9 @@ async def admin_users(request: Request, page: int = 1):
@router.post("/users/{uid}/role")
async def admin_user_role(request: Request, uid: str):
async def admin_user_role(request: Request, uid: str, data: Annotated[AdminRoleForm, Form()]):
admin = require_admin(request)
form = await request.form()
role = form.get("role", "").strip().capitalize()
if role not in ("Member", "Admin"):
return RedirectResponse(url="/admin/users", status_code=302)
role = data.role.capitalize()
if uid == admin["uid"]:
return RedirectResponse(url="/admin/users", status_code=302)
users = get_table("users")
@ -66,14 +65,10 @@ async def admin_user_role(request: Request, uid: str):
@router.post("/users/{uid}/password")
async def admin_user_password(request: Request, uid: str):
async def admin_user_password(request: Request, uid: str, data: Annotated[AdminPasswordForm, Form()]):
admin = require_admin(request)
form = await request.form()
password = form.get("password", "")
if len(password) < 6:
return RedirectResponse(url="/admin/users", status_code=302)
users = get_table("users")
users.update({"uid": uid, "password_hash": hash_password(password)}, ["uid"])
users.update({"uid": uid, "password_hash": hash_password(data.password)}, ["uid"])
logger.info(f"Admin {admin['username']} changed password for user {uid}")
return RedirectResponse(url="/admin/users", status_code=302)

View File

@ -1,13 +1,15 @@
import hashlib
import secrets
import logging
from typing import Annotated
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from fastapi.responses import RedirectResponse, HTMLResponse
from devplacepy.database import get_table
from devplacepy.templating import templates
from devplacepy.utils import hash_password, verify_password, create_session, generate_uid, get_current_user
from devplacepy.seo import base_seo_context
from devplacepy.models import SignupForm, LoginForm, ForgotPasswordForm, ResetPasswordForm
logger = logging.getLogger(__name__)
router = APIRouter()
@ -42,25 +44,12 @@ async def login_page(request: Request):
@router.post("/signup")
async def signup(request: Request):
form = await request.form()
username = form.get("username", "").strip()
email = form.get("email", "").strip().lower()
password = form.get("password", "")
confirm_password = form.get("confirm_password", "")
async def signup(request: Request, data: Annotated[SignupForm, Form()]):
username = data.username
email = data.email.strip().lower()
password = data.password
errors = []
if len(username) < 3 or len(username) > 32:
errors.append("Username must be between 3 and 32 characters")
if not username.isascii() or not all(c.isalnum() or c in ("-", "_") for c in username):
errors.append("Username can only contain letters, numbers, hyphens, and underscores")
if not email or "@" not in email or len(email) > 255:
errors.append("Valid email is required")
if len(password) < 6:
errors.append("Password must be at least 6 characters")
if password != confirm_password:
errors.append("Passwords do not match")
users = get_table("users")
if users.find_one(username=username):
errors.append("Username already taken")
@ -109,18 +98,14 @@ async def signup(request: Request):
@router.post("/login")
async def login(request: Request):
form = await request.form()
email = form.get("email", "").strip().lower()
password = form.get("password", "")
remember_me = form.get("remember_me") == "on"
async def login(request: Request, data: Annotated[LoginForm, Form()]):
email = data.email.strip().lower()
password = data.password
remember_me = data.remember_me == "on"
errors = []
if not email or not password:
errors.append("Email and password are required")
users = get_table("users")
user = users.find_one(email=email) if not errors else None
user = users.find_one(email=email)
if not user or not verify_password(password, user["password_hash"]):
errors.append("Invalid email or password")
@ -151,18 +136,9 @@ async def forgot_password_page(request: Request):
@router.post("/forgot-password")
async def forgot_password(request: Request):
form = await request.form()
email = form.get("email", "").strip().lower()
errors = []
if not email or "@" not in email:
errors.append("Valid email is required")
async def forgot_password(request: Request, data: Annotated[ForgotPasswordForm, Form()]):
email = data.email.strip().lower()
seo_ctx = base_seo_context(request, title="Reset Password", robots="noindex,nofollow")
if errors:
return templates.TemplateResponse(request, "forgot_password.html", {
**seo_ctx, "request": request, "errors": errors,
})
users = get_table("users")
user = users.find_one(email=email)
@ -198,22 +174,9 @@ async def reset_password_page(request: Request, token: str):
@router.post("/reset-password/{token}")
async def reset_password(request: Request, token: str):
form = await request.form()
password = form.get("password", "")
confirm = form.get("confirm_password", "")
async def reset_password(request: Request, token: str, data: Annotated[ResetPasswordForm, Form()]):
password = data.password
errors = []
if len(password) < 6:
errors.append("Password must be at least 6 characters")
if password != confirm:
errors.append("Passwords do not match")
if errors:
seo_ctx = base_seo_context(request, title="Set New Password", robots="noindex,nofollow")
return templates.TemplateResponse(request, "reset_password.html", {
**seo_ctx, "request": request, "token": token, "errors": errors,
})
resets = get_table("password_resets")
users = get_table("users")

View File

@ -1,7 +1,9 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from devplacepy.models import BugForm
from devplacepy.database import get_table, load_comments
from devplacepy.attachments import get_attachments_batch, link_attachments
from devplacepy.templating import templates
@ -52,14 +54,10 @@ async def bugs_page(request: Request):
@router.post("/create")
async def create_bug(request: Request):
async def create_bug(request: Request, data: Annotated[BugForm, Form()]):
user = require_user(request)
form = await request.form()
title = form.get("title", "").strip()
description = form.get("description", "").strip()
if not title or not description:
return RedirectResponse(url="/bugs", status_code=302)
title = data.title.strip()
description = data.description.strip()
bugs_table = get_table("bug_reports")
bug_uid = generate_uid()
@ -72,9 +70,8 @@ async def create_bug(request: Request):
"created_at": datetime.now(timezone.utc).isoformat(),
})
attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else []
if attachment_uids:
link_attachments(attachment_uids, "bug", bug_uid)
if data.attachment_uids:
link_attachments(data.attachment_uids, "bug", bug_uid)
create_mention_notifications(description, user["uid"], f"/bugs?highlight={bug_uid}")
logger.info(f"Bug report created by {user['username']}: {title}")

View File

@ -1,11 +1,13 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from fastapi.responses import RedirectResponse
from devplacepy.database import get_table, resolve_by_slug
from devplacepy.attachments import link_attachments, delete_target_attachments
from devplacepy.templating import clear_unread_cache
from devplacepy.utils import generate_uid, require_user, create_mention_notifications
from devplacepy.models import CommentForm
logger = logging.getLogger(__name__)
router = APIRouter()
@ -32,23 +34,15 @@ def resolve_target_redirect(target_type, target_uid):
@router.post("/create")
async def create_comment(request: Request):
async def create_comment(request: Request, data: Annotated[CommentForm, Form()]):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
target_uid = form.get("target_uid") or form.get("post_uid", "")
target_type = form.get("target_type", "post")
parent_uid = form.get("parent_uid", "")
content = data.content.strip()
target_uid = data.target_uid or data.post_uid
target_type = data.target_type
parent_uid = data.parent_uid
redirect_url = resolve_target_redirect(target_type, target_uid)
if not content or not target_uid:
return RedirectResponse(url=redirect_url, status_code=302)
if len(content) < 3:
return RedirectResponse(url=redirect_url, status_code=302)
if len(content) > 1000:
return RedirectResponse(url=redirect_url, status_code=302)
comment_uid = generate_uid()
insert = {
"uid": comment_uid,
@ -63,9 +57,8 @@ async def create_comment(request: Request):
insert["post_uid"] = target_uid
get_table("comments").insert(insert)
attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else []
if attachment_uids:
link_attachments(attachment_uids, "comment", comment_uid)
if data.attachment_uids:
link_attachments(data.attachment_uids, "comment", comment_uid)
badges = get_table("badges")
existing = badges.find_one(user_uid=user["uid"], badge_name="First Comment")

View File

@ -1,6 +1,8 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException
from fastapi import APIRouter, Request, HTTPException, Form
from devplacepy.models import GistForm, GistEditForm
from fastapi.responses import HTMLResponse, RedirectResponse
from devplacepy.database import get_table, load_comments, get_vote_counts, resolve_by_slug, db
from devplacepy.attachments import get_attachments, delete_target_attachments
@ -127,24 +129,12 @@ async def gist_detail(request: Request, gist_slug: str):
@router.post("/create")
async def create_gist(request: Request):
async def create_gist(request: Request, data: Annotated[GistForm, Form()]):
user = require_user(request)
form = await request.form()
title = form.get("title", "").strip()
description = form.get("description", "").strip()
source_code = form.get("source_code", "").strip()
language = form.get("language", "plaintext")
if not title:
return RedirectResponse(url="/gists", status_code=302)
if len(title) > 200:
return RedirectResponse(url="/gists", status_code=302)
if not source_code:
return RedirectResponse(url="/gists", status_code=302)
if len(source_code) > 50000:
return RedirectResponse(url="/gists", status_code=302)
if len(description) > 5000:
return RedirectResponse(url="/gists", status_code=302)
title = data.title.strip()
description = data.description.strip()
source_code = data.source_code.strip()
language = data.language
valid_languages = {l[0] for l in LANGUAGES}
if language not in valid_languages:
@ -165,10 +155,8 @@ async def create_gist(request: Request):
"created_at": datetime.now(timezone.utc).isoformat(),
})
attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else []
from devplacepy.attachments import link_attachments
link_attachments(attachment_uids, "gist", uid)
link_attachments(data.attachment_uids, "gist", uid)
create_mention_notifications(description or "", user["uid"], f"/gists/{gist_slug}")
@ -177,25 +165,17 @@ async def create_gist(request: Request):
@router.post("/edit/{gist_slug}")
async def edit_gist(request: Request, gist_slug: str):
async def edit_gist(request: Request, gist_slug: str, data: Annotated[GistEditForm, Form()]):
user = require_user(request)
gists = get_table("gists")
gist = resolve_by_slug(gists, gist_slug)
if not gist or gist["user_uid"] != user["uid"]:
return RedirectResponse(url="/gists", status_code=302)
form = await request.form()
title = form.get("title", "").strip()
description = form.get("description", "").strip()
source_code = form.get("source_code", "").strip()
language = form.get("language", "plaintext")
if not title or len(title) > 200:
return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302)
if not source_code or len(source_code) > 50000:
return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302)
if len(description) > 5000:
return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302)
title = data.title.strip()
description = data.description.strip()
source_code = data.source_code.strip()
language = data.language
valid_languages = {l[0] for l in LANGUAGES}
if language not in valid_languages:

View File

@ -1,6 +1,8 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from devplacepy.models import MessageForm
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from devplacepy.database import get_table, db
from devplacepy.attachments import get_attachments_batch
@ -145,43 +147,39 @@ async def search_users(request: Request, q: str = ""):
@router.post("/send")
async def send_message(request: Request):
async def send_message(request: Request, data: Annotated[MessageForm, Form()]):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
receiver_uid = form.get("receiver_uid", "")
content = data.content.strip()
receiver_uid = data.receiver_uid
if content and receiver_uid:
messages_table = get_table("messages")
msg_uid = generate_uid()
messages_table.insert({
"uid": msg_uid,
"sender_uid": user["uid"],
"receiver_uid": receiver_uid,
"content": content,
messages_table = get_table("messages")
msg_uid = generate_uid()
messages_table.insert({
"uid": msg_uid,
"sender_uid": user["uid"],
"receiver_uid": receiver_uid,
"content": content,
"read": False,
"created_at": datetime.now(timezone.utc).isoformat(),
})
from devplacepy.attachments import link_attachments
link_attachments(data.attachment_uids, "message", msg_uid)
if user["uid"] != receiver_uid:
notifications = get_table("notifications")
notifications.insert({
"uid": generate_uid(),
"user_uid": receiver_uid,
"type": "message",
"message": f"{user['username']} sent you a message",
"related_uid": user["uid"],
"read": False,
"created_at": datetime.now(timezone.utc).isoformat(),
})
clear_unread_cache(receiver_uid)
attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else []
create_mention_notifications(content, user["uid"], f"/messages?with_uid={receiver_uid}")
from devplacepy.attachments import link_attachments
link_attachments(attachment_uids, "message", msg_uid)
if user["uid"] != receiver_uid:
notifications = get_table("notifications")
notifications.insert({
"uid": generate_uid(),
"user_uid": receiver_uid,
"type": "message",
"message": f"{user['username']} sent you a message",
"related_uid": user["uid"],
"read": False,
"created_at": datetime.now(timezone.utc).isoformat(),
})
clear_unread_cache(receiver_uid)
create_mention_notifications(content, user["uid"], f"/messages?with_uid={receiver_uid}")
logger.info(f"Message {msg_uid} sent from {user['username']} to {receiver_uid}")
return RedirectResponse(url=f"/messages?with_uid={receiver_uid}", status_code=302)
logger.info(f"Message {msg_uid} sent from {user['username']} to {receiver_uid}")
return RedirectResponse(url=f"/messages?with_uid={receiver_uid}", status_code=302)

View File

@ -1,6 +1,7 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException
from fastapi import APIRouter, Request, HTTPException, Form
from fastapi.responses import RedirectResponse, HTMLResponse
from devplacepy.constants import TOPICS
from devplacepy.database import get_table, get_comment_counts_by_post_uids, load_comments, db, resolve_by_slug
@ -8,36 +9,24 @@ from devplacepy.templating import templates
from devplacepy.utils import generate_uid, get_current_user, require_user, time_ago, make_combined_slug, create_mention_notifications
from devplacepy.seo import base_seo_context, site_url, website_schema, breadcrumb_schema, discussion_forum_posting, combine, truncate
from devplacepy.attachments import get_attachments, link_attachments, delete_target_attachments, save_inline_image, delete_inline_image
from devplacepy.models import PostForm, PostEditForm
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/create")
async def create_post(request: Request):
async def create_post(request: Request, data: Annotated[PostForm, Form()]):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
title = form.get("title", "").strip()
topic = form.get("topic", "random")
project_uid = form.get("project_uid", "")
errors = []
if not content:
errors.append("Content is required")
if content and len(content) < 10:
errors.append("Content must be at least 10 characters")
if len(content) > 2000:
errors.append("Content too long (max 2000 characters)")
if topic not in TOPICS:
topic = "random"
if errors:
return RedirectResponse(url="/feed", status_code=302)
content = data.content.strip()
title = data.title.strip()
topic = data.topic
project_uid = data.project_uid
image_filename = None
form = await request.form()
image_file = form.get("image")
if image_file and hasattr(image_file, "filename") and image_file.filename:
if image_file is not None and hasattr(image_file, "filename") and image_file.filename:
image_filename = save_inline_image(await image_file.read(), image_file.filename)
if image_filename:
content += f"\n\n![](/static/uploads/{image_filename})"
@ -69,9 +58,8 @@ async def create_post(request: Request):
"created_at": datetime.now(timezone.utc).isoformat(),
})
attachment_uids = form.getlist("attachment_uids")
if attachment_uids:
link_attachments(attachment_uids, "post", uid)
if data.attachment_uids:
link_attachments(data.attachment_uids, "post", uid)
create_mention_notifications(content, user["uid"], f"/posts/{post_slug}")
logger.info(f"Post {uid} created by {user['username']}")
@ -146,36 +134,18 @@ async def view_post(request: Request, post_slug: str):
@router.post("/edit/{post_slug}")
async def edit_post(request: Request, post_slug: str):
async def edit_post(request: Request, post_slug: str, data: Annotated[PostEditForm, Form()]):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
title = form.get("title", "").strip()
topic = form.get("topic", "random")
errors = []
if not content:
errors.append("Content is required")
if content and len(content) < 10:
errors.append("Content must be at least 10 characters")
if len(content) > 2000:
errors.append("Content too long (max 2000 characters)")
if topic not in TOPICS:
topic = "random"
posts = get_table("posts")
post = resolve_by_slug(posts, post_slug)
if not post or post["user_uid"] != user["uid"]:
return RedirectResponse(url="/feed", status_code=302)
if errors:
return RedirectResponse(url=f"/posts/{post['slug'] or post['uid']}", status_code=302)
posts.update({
"uid": post["uid"],
"content": content,
"title": title or None,
"topic": topic,
"content": data.content.strip(),
"title": data.title.strip() or None,
"topic": data.topic,
}, ["uid"])
logger.info(f"Post {post['uid']} edited by {user['username']}")

View File

@ -1,5 +1,7 @@
import logging
from fastapi import APIRouter, Request
from typing import Annotated
from fastapi import APIRouter, Request, Form
from devplacepy.models import ProfileForm
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from devplacepy.database import get_table, db
from devplacepy.templating import templates
@ -10,6 +12,22 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/search")
async def search_users(request: Request, q: str = ""):
require_user(request)
if not q or len(q) < 1:
return JSONResponse({"results": []})
if "users" in db.tables:
rows = db.query(
"SELECT uid, username FROM users WHERE username LIKE :q LIMIT 10",
q=f"%{q}%",
)
results = [{"uid": r["uid"], "username": r["username"]} for r in rows]
else:
results = []
return JSONResponse({"results": results})
@router.get("/{username}", response_class=HTMLResponse)
async def profile_page(request: Request, username: str, tab: str = "posts"):
current_user = get_current_user(request)
@ -95,37 +113,16 @@ async def profile_page(request: Request, username: str, tab: str = "posts"):
})
@router.get("/search")
async def search_users(request: Request, q: str = ""):
require_user(request)
if not q or len(q) < 1:
return JSONResponse({"results": []})
if "users" in db.tables:
rows = db.query(
"SELECT uid, username FROM users WHERE username LIKE :q LIMIT 10",
q=f"%{q}%",
)
results = [{"uid": r["uid"], "username": r["username"]} for r in rows]
else:
results = []
return JSONResponse({"results": results})
@router.post("/update")
async def update_profile(request: Request):
async def update_profile(request: Request, data: Annotated[ProfileForm, Form()]):
user = require_user(request)
form = await request.form()
bio = form.get("bio", "").strip()
location = form.get("location", "").strip()
git_link = form.get("git_link", "").strip()
website = form.get("website", "").strip()
users = get_table("users")
users.update({
"uid": user["uid"],
"bio": bio,
"location": location,
"git_link": git_link,
"website": website,
"bio": data.bio.strip(),
"location": data.location.strip(),
"git_link": data.git_link.strip(),
"website": data.website.strip(),
}, ["uid"])
logger.info(f"Profile updated for {user['username']}")

View File

@ -1,6 +1,8 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException
from fastapi import APIRouter, Request, HTTPException, Form
from devplacepy.models import ProjectForm
from fastapi.responses import HTMLResponse, RedirectResponse
from devplacepy.database import get_table, get_vote_counts, load_comments, resolve_by_slug
from devplacepy.attachments import link_attachments, get_attachments, delete_target_attachments
@ -147,19 +149,10 @@ async def delete_project(request: Request, project_slug: str):
@router.post("/create")
async def create_project(request: Request):
async def create_project(request: Request, data: Annotated[ProjectForm, Form()]):
user = require_user(request)
form = await request.form()
title = form.get("title", "").strip()
description = form.get("description", "").strip()
release_date = form.get("release_date", "")
demo_date = form.get("demo_date", "")
project_type = form.get("project_type", "software")
platforms = form.get("platforms", "").strip()
status = form.get("status", "In Development")
if not title or not description:
return RedirectResponse(url="/projects", status_code=302)
title = data.title.strip()
description = data.description.strip()
projects = get_table("projects")
uid = generate_uid()
@ -170,18 +163,17 @@ async def create_project(request: Request):
"title": title,
"slug": project_slug,
"description": description,
"release_date": release_date or None,
"demo_date": demo_date or None,
"project_type": project_type,
"platforms": platforms,
"status": status,
"release_date": data.release_date or None,
"demo_date": data.demo_date or None,
"project_type": data.project_type,
"platforms": data.platforms.strip(),
"status": data.status,
"stars": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
})
attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else []
if attachment_uids:
link_attachments(attachment_uids, "project", uid)
if data.attachment_uids:
link_attachments(data.attachment_uids, "project", uid)
create_mention_notifications(description, user["uid"], f"/projects/{project_slug}")
logger.info(f"Project {uid} created by {user['username']}")

View File

@ -1,23 +1,21 @@
import logging
from typing import Annotated
from datetime import datetime, timezone
from fastapi import APIRouter, Request
from fastapi import APIRouter, Request, Form
from fastapi.responses import RedirectResponse
from devplacepy.database import get_table
from devplacepy.templating import clear_unread_cache
from devplacepy.utils import generate_uid, require_user
from devplacepy.models import VoteForm
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/{target_type}/{target_uid}")
async def vote(request: Request, target_type: str, target_uid: str):
async def vote(request: Request, target_type: str, target_uid: str, data: Annotated[VoteForm, Form()]):
user = require_user(request)
form = await request.form()
value = int(form.get("value", "1"))
if value not in (1, -1):
return RedirectResponse(url="/feed", status_code=302)
value = data.value
votes = get_table("votes")
existing = votes.find_one(user_uid=user["uid"], target_uid=target_uid, target_type=target_type)

View File

@ -129,3 +129,59 @@ def test_admin_news_pagination(alice):
next_btn.click()
page.wait_for_url(f"{BASE_URL}/admin/news?page=2", wait_until="domcontentloaded")
assert "Page 2" in page.text_content(".pagination-info")
def _seed_target_user():
uid = str(uuid4())
get_table("users").insert({
"uid": uid, "username": f"target_{uid[:8]}", "email": f"target_{uid[:8]}@test.devplace",
"password_hash": "x", "role": "Member", "is_active": True,
"created_at": datetime.now(timezone.utc).isoformat(),
})
return uid
def test_admin_settings_save(alice):
page, _ = alice
page.goto(f"{BASE_URL}/admin/settings", wait_until="domcontentloaded")
assert page.is_visible("#site_name")
newval = f"model-{uuid4().hex[:8]}"
page.fill("#news_ai_model", newval)
page.click("button:has-text('Save Settings')")
page.wait_for_url("**/admin/settings", wait_until="domcontentloaded")
assert page.locator("#news_ai_model").input_value() == newval
def test_admin_change_user_role(alice):
page, _ = alice
uid = _seed_target_user()
page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded")
sel = f"form[action='/admin/users/{uid}/role'] select[name='role']"
page.locator(sel).wait_for(state="visible")
page.select_option(sel, "admin")
page.wait_for_url("**/admin/users**", wait_until="domcontentloaded")
page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded")
assert page.locator(sel).input_value() == "admin"
def test_admin_toggle_user_active(alice):
page, _ = alice
uid = _seed_target_user()
page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded")
btn = f"form[action='/admin/users/{uid}/toggle'] button"
before = page.locator(btn).inner_text()
page.locator(btn).click()
page.wait_for_url("**/admin/users**", wait_until="domcontentloaded")
assert page.locator(btn).inner_text() != before
def test_admin_news_featured_toggle(alice):
page, _ = alice
seed_admin_news()
page.goto(f"{BASE_URL}/admin/news", wait_until="domcontentloaded")
toggle_sel = "form[action$='/toggle'] button.admin-toggle-switch"
was_active = "active" in (page.locator(toggle_sel).first.get_attribute("class") or "")
page.locator(toggle_sel).first.click()
page.wait_for_url(f"{BASE_URL}/admin/news", wait_until="domcontentloaded")
is_active = "active" in (page.locator(toggle_sel).first.get_attribute("class") or "")
assert is_active != was_active

View File

@ -1,4 +1,66 @@
import hashlib
import time
from datetime import datetime, timedelta, timezone
from tests.conftest import BASE_URL
from devplacepy.database import get_table
from devplacepy.utils import hash_password, generate_uid
def test_forgot_password_page_loads(page, app_server):
page.goto(f"{BASE_URL}/auth/forgot-password", wait_until="domcontentloaded")
assert page.is_visible("input#email")
assert page.is_visible("button:has-text('Send Reset Link')")
def test_forgot_password_submit_shows_sent(page, app_server):
page.goto(f"{BASE_URL}/auth/forgot-password", wait_until="domcontentloaded")
page.fill("#email", "anybody@example.com")
page.click("button:has-text('Send Reset Link')")
page.locator(".auth-success").wait_for(state="visible")
def test_reset_password_page_loads(page, app_server):
page.goto(f"{BASE_URL}/auth/reset-password/sometoken", wait_until="domcontentloaded")
assert page.is_visible("input#password")
assert page.is_visible("input#confirm_password")
def test_reset_password_mismatch_shows_error(page, app_server):
page.goto(f"{BASE_URL}/auth/reset-password/sometoken", wait_until="domcontentloaded")
page.fill("#password", "abcdef1")
page.fill("#confirm_password", "different1")
page.click("button:has-text('Reset Password')")
page.locator(".auth-error").wait_for(state="visible")
def test_reset_password_full_flow(page, app_server):
uname = f"reset_{int(time.time() * 1000)}"
email = f"{uname}@t.dev"
uid = generate_uid()
get_table("users").insert({
"uid": uid, "username": uname, "email": email,
"password_hash": hash_password("oldpass123"),
"bio": "", "location": "", "git_link": "", "website": "",
"role": "Member", "is_active": True, "level": 1, "xp": 0, "stars": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
})
token = f"tok{uid[:8]}"
get_table("password_resets").insert({
"uid": generate_uid(), "user_uid": uid,
"token": hashlib.sha256(token.encode()).hexdigest(),
"expires_at": (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat(),
"used": False, "created_at": datetime.now(timezone.utc).isoformat(),
})
page.goto(f"{BASE_URL}/auth/reset-password/{token}", wait_until="domcontentloaded")
page.fill("#password", "newpass456")
page.fill("#confirm_password", "newpass456")
page.click("button:has-text('Reset Password')")
page.wait_for_url("**/auth/login", wait_until="domcontentloaded")
page.fill("#email", email)
page.fill("#password", "newpass456")
page.click("button:has-text('Sign in')")
page.wait_for_url("**/feed", wait_until="domcontentloaded")
assert "/feed" in page.url
def test_signup_page_loads(page, app_server):

View File

@ -1,4 +1,13 @@
import requests
from devplacepy.avatar import avatar_url, generate_avatar_svg
from tests.conftest import BASE_URL
def test_avatar_endpoint_serves_svg(app_server):
r = requests.get(f"{BASE_URL}/avatar/multiavatar/alice_test?size=64")
assert r.status_code == 200
assert "svg" in r.headers.get("content-type", "").lower()
assert "<svg" in r.text
def test_avatar_url_format():

View File

@ -0,0 +1,8 @@
import requests
from tests.conftest import BASE_URL
def test_404_renders_error_page(app_server):
r = requests.get(f"{BASE_URL}/this-page-does-not-exist-xyz")
assert r.status_code == 404
assert "404" in r.text or "not found" in r.text.lower()

19
tests/test_follow.py Normal file
View File

@ -0,0 +1,19 @@
from tests.conftest import BASE_URL
FOLLOW = "form[action='/follow/bob_test'] button"
UNFOLLOW = "form[action='/follow/unfollow/bob_test'] button"
def test_follow_then_unfollow(alice):
page, _ = alice
page.goto(f"{BASE_URL}/profile/bob_test", wait_until="domcontentloaded")
if page.is_visible(UNFOLLOW):
page.click(UNFOLLOW)
page.wait_for_url("**/profile/bob_test", wait_until="domcontentloaded")
assert page.is_visible(FOLLOW)
page.click(FOLLOW)
page.wait_for_url("**/profile/bob_test", wait_until="domcontentloaded")
assert page.is_visible(UNFOLLOW)
page.click(UNFOLLOW)
page.wait_for_url("**/profile/bob_test", wait_until="domcontentloaded")
assert page.is_visible(FOLLOW)

View File

@ -149,6 +149,15 @@ def test_gist_detail_share_button(alice, app_server):
assert_share_copies(page, "/gists/")
def test_gist_code_copy_button(alice, app_server):
from playwright.sync_api import expect
page, _ = alice
_create_gist(page, title=f"Copy Gist {int(time.time())}", source_code="copyme = 42")
btn = page.locator("button[data-copy='gist-code-content']")
btn.click()
expect(btn).to_have_text("Copied!", timeout=3000)
def test_profile_gists_tab(alice, app_server):
page, alice_user = alice
title = f"Profile Tab Test {int(time.time())}"

View File

@ -1,4 +1,17 @@
import time
from tests.conftest import BASE_URL
from devplacepy.database import get_table
def test_send_message_appears_in_thread(alice):
page, _ = alice
bob = get_table("users").find_one(username="bob_test")
page.goto(f"{BASE_URL}/messages?with_uid={bob['uid']}", wait_until="domcontentloaded")
msg = f"Hello bob {int(time.time() * 1000)}"
page.fill("input[name='content']", msg)
page.locator(".messages-send-btn").click()
page.wait_for_url("**/messages**", wait_until="domcontentloaded")
assert page.is_visible(f".message-bubble:has-text('{msg}')")
def test_messages_page_loads(alice):

12
tests/test_mobile.py Normal file
View File

@ -0,0 +1,12 @@
from tests.conftest import BASE_URL
def test_mobile_hamburger_nav(alice):
page, _ = alice
page.set_viewport_size({"width": 390, "height": 844})
page.goto(f"{BASE_URL}/feed", wait_until="domcontentloaded")
btn = page.locator("#hamburger-btn")
btn.wait_for(state="visible")
btn.click()
page.locator("#mobile-panel.open").wait_for(state="visible")
assert "open" in (page.locator("#mobile-panel").get_attribute("class") or "")

View File

@ -190,3 +190,52 @@ def test_post_across_all_topics(alice):
badge = page.locator(f".badge-{topic}")
assert badge.is_visible()
page.wait_for_timeout(200)
def test_delete_own_post(alice):
page, _ = alice
create_post(page, "random", "Post to be deleted permanently here")
post_url = page.url
page.once("dialog", lambda d: d.accept())
page.locator(".post-action-btn:has-text('Delete')").click()
page.wait_for_url(f"{BASE_URL}/feed", wait_until="domcontentloaded")
resp = page.goto(post_url, wait_until="domcontentloaded")
assert resp.status == 404
def test_content_rendering_markdown_and_highlight(alice):
page, _ = alice
create_post(page, "random", "Hello **world bold** text\n\n```python\nprint('hi')\n```")
page.locator(".rendered-content strong").first.wait_for(state="visible")
assert page.locator(".rendered-content pre code").count() >= 1
def test_mention_autocomplete(alice):
page, _ = alice
create_post(page, "random", "Post for mention autocomplete here")
ta = page.locator(".comment-form textarea[data-mention]").first
ta.click()
ta.press_sequentially("@bob")
item = page.locator(".mention-dropdown-item").first
item.wait_for(state="visible")
assert "bob" in item.inner_text().lower()
def test_emoji_picker_opens(alice):
page, _ = alice
create_post(page, "random", "Post for emoji picker here")
page.locator(".emoji-toggle-btn").first.click()
page.locator(".emoji-picker-wrapper").first.wait_for(state="visible")
assert page.locator("emoji-picker").first.is_visible()
def test_attachment_upload_ui(alice):
import io
from PIL import Image
page, _ = alice
create_post(page, "random", "Post for attachment upload UI")
buf = io.BytesIO()
Image.new("RGB", (4, 4), (0, 128, 255)).save(buf, "PNG")
file_input = page.locator(".comment-form .attachment-upload-container input[type='file']").first
file_input.set_input_files({"name": "pic.png", "mimeType": "image/png", "buffer": buf.getvalue()})
page.locator(".attachment-preview[data-uid]").first.wait_for(state="visible", timeout=15000)

View File

@ -1,6 +1,14 @@
from tests.conftest import BASE_URL
def test_profile_search_returns_results(alice):
page, _ = alice
resp = page.request.get(f"{BASE_URL}/profile/search?q=bob")
assert resp.status == 200
data = resp.json()
assert any("bob" in u["username"] for u in data["results"])
def test_profile_page_loads(alice):
page, user = alice
page.goto(f"{BASE_URL}/profile/{user['username']}", wait_until="domcontentloaded")

View File

@ -1,6 +1,41 @@
from tests.conftest import BASE_URL, assert_share_copies
def _create_project(page, title):
page.goto(f"{BASE_URL}/projects", wait_until="domcontentloaded")
page.locator("#create-project-btn").click()
page.fill("#title", title)
page.fill("#description", "Project for testing")
page.fill("#release_date", "2026-06-01")
page.check("input[value='game']")
page.locator("#platforms-input").fill("PC")
page.locator("#platforms-input").press("Enter")
page.click("button:has-text('Create Project')")
page.wait_for_url(f"{BASE_URL}/projects/*", wait_until="domcontentloaded")
def test_project_vote(alice):
page, _ = alice
_create_project(page, "Votable Project")
star = "form[action*='/votes/project/'] button"
before = int(page.locator(star).first.inner_text().strip(""))
page.locator(star).first.click()
page.wait_for_url(f"{BASE_URL}/projects/*", wait_until="domcontentloaded")
after = int(page.locator(star).first.inner_text().strip(""))
assert after == before + 1
def test_delete_own_project(alice):
page, _ = alice
_create_project(page, "Deletable Project XYZ")
proj_url = page.url
page.once("dialog", lambda d: d.accept())
page.locator("form[action*='/projects/delete/'] button").click()
page.wait_for_url(f"{BASE_URL}/projects", wait_until="domcontentloaded")
resp = page.goto(proj_url, wait_until="domcontentloaded")
assert resp.status == 404
def test_project_detail_share_button(alice):
page, _ = alice
page.goto(f"{BASE_URL}/projects", wait_until="domcontentloaded")