diff --git a/devplacepy/main.py b/devplacepy/main.py index 90441d7..ed4e20d 100644 --- a/devplacepy/main.py +++ b/devplacepy/main.py @@ -6,6 +6,7 @@ from collections import defaultdict from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles +from fastapi.exceptions import RequestValidationError from devplacepy.config import STATIC_DIR, PORT from devplacepy.database import init_db, get_table, db, get_users_by_uids, get_comment_counts_by_post_uids, get_vote_counts from devplacepy.templating import templates @@ -49,6 +50,51 @@ async def server_error(request: Request, exc): seo_ctx = base_seo_context(request, title="Server Error - DevPlace", description="Something went wrong.", robots="noindex") return templates.TemplateResponse(request, "error.html", {**seo_ctx, "request": request, "error_code": 500, "error_message": "Internal server error"}, status_code=500) + +_AUTH_FORM_PAGES = { + "/auth/signup": ("signup.html", "Join DevPlace"), + "/auth/login": ("login.html", "Sign In"), + "/auth/forgot-password": ("forgot_password.html", "Reset Password"), +} + +_FRIENDLY_ERRORS = { + ("username", "too_short"): "Username must be between 3 and 32 characters", + ("username", "too_long"): "Username must be between 3 and 32 characters", + ("password", "too_short"): "Password must be at least 6 characters", +} + + +def _friendly_error(err): + field = err["loc"][-1] if err.get("loc") else "" + key = (field, err.get("type", "").replace("string_", "")) + if key in _FRIENDLY_ERRORS: + return _FRIENDLY_ERRORS[key] + msg = err.get("msg", "Invalid input") + prefix = "Value error, " + return msg[len(prefix):] if msg.startswith(prefix) else msg + + +@app.exception_handler(RequestValidationError) +async def on_validation_error(request: Request, exc: RequestValidationError): + errors = [_friendly_error(e) for e in exc.errors()] + path = request.url.path + page = _AUTH_FORM_PAGES.get(path) + if page is None and path.startswith("/auth/reset-password/"): + page = ("reset_password.html", "Set New Password") + if page: + template_name, title = page + context = {**base_seo_context(request, title=title, robots="noindex,nofollow"), "request": request, "errors": errors} + try: + form = await request.form() + context.update({k: v for k, v in form.items() if isinstance(v, str)}) + except Exception: + pass + if "token" in request.path_params: + context["token"] = request.path_params["token"] + return templates.TemplateResponse(request, template_name, context, status_code=400) + referer = request.headers.get("referer") or "/feed" + return RedirectResponse(url=referer, status_code=303) + app.include_router(auth.router, prefix="/auth") app.include_router(feed.router, prefix="/feed") app.include_router(posts.router, prefix="/posts") diff --git a/devplacepy/models.py b/devplacepy/models.py index e24eeaa..8ad49dc 100644 --- a/devplacepy/models.py +++ b/devplacepy/models.py @@ -1,49 +1,161 @@ -from pydantic import BaseModel, Field -from typing import Optional -from datetime import date +from typing import Optional, Literal +from pydantic import BaseModel, Field, field_validator, model_validator +from devplacepy.constants import TOPICS -class SignupRequest(BaseModel): - username: str = Field(min_length=3, max_length=32, pattern=r"^[a-zA-Z0-9_-]+$") - email: str = Field(min_length=5, max_length=255) +class SignupForm(BaseModel): + username: str = Field(min_length=3, max_length=32) + email: str = Field(min_length=1, max_length=255) password: str = Field(min_length=6, max_length=128) - confirm_password: str = Field(min_length=6, max_length=128) + confirm_password: str = Field(min_length=1, max_length=128) + + @field_validator("username") + @classmethod + def username_chars(cls, value): + if not value.isascii() or not all(c.isalnum() or c in ("-", "_") for c in value): + raise ValueError("Username can only contain letters, numbers, hyphens, and underscores") + return value + + @field_validator("email") + @classmethod + def email_has_at(cls, value): + if "@" not in value: + raise ValueError("Valid email is required") + return value + + @model_validator(mode="after") + def passwords_match(self): + if self.password != self.confirm_password: + raise ValueError("Passwords do not match") + return self -class LoginRequest(BaseModel): - email: str = Field(min_length=5, max_length=255) +class LoginForm(BaseModel): + email: str = Field(min_length=1, max_length=255) + password: str = Field(min_length=1, max_length=128) + remember_me: str = "" + + +class ForgotPasswordForm(BaseModel): + email: str = Field(min_length=1, max_length=255) + + @field_validator("email") + @classmethod + def email_has_at(cls, value): + if "@" not in value: + raise ValueError("Valid email is required") + return value + + +class ResetPasswordForm(BaseModel): password: str = Field(min_length=6, max_length=128) - remember_me: bool = False + confirm_password: str = Field(min_length=1, max_length=128) + + @model_validator(mode="after") + def passwords_match(self): + if self.password != self.confirm_password: + raise ValueError("Passwords do not match") + return self -class PostCreate(BaseModel): - content: str = Field(min_length=1, max_length=2000) - title: Optional[str] = Field(default=None, max_length=500) - topic: str = Field(pattern=r"^(devlog|showcase|question|rant|fun|random)$") - project_uid: Optional[str] = None +class PostForm(BaseModel): + content: str = Field(min_length=10, max_length=2000) + title: str = Field(default="", max_length=500) + topic: str = "random" + project_uid: str = "" + attachment_uids: list[str] = [] + + @field_validator("topic") + @classmethod + def valid_topic(cls, value): + return value if value in TOPICS else "random" -class CommentCreate(BaseModel): - content: str = Field(min_length=1, max_length=1000) - parent_uid: Optional[str] = None +class PostEditForm(BaseModel): + content: str = Field(min_length=10, max_length=2000) + title: str = Field(default="", max_length=500) + topic: str = "random" + + @field_validator("topic") + @classmethod + def valid_topic(cls, value): + return value if value in TOPICS else "random" -class ProjectCreate(BaseModel): +class CommentForm(BaseModel): + content: str = Field(min_length=3, max_length=1000) + target_uid: str = "" + post_uid: str = "" + target_type: Literal["post", "project", "news", "bug", "gist"] = "post" + parent_uid: str = "" + attachment_uids: list[str] = [] + + @model_validator(mode="after") + def require_target(self): + if not (self.target_uid or self.post_uid): + raise ValueError("A target is required") + return self + + +class ProjectForm(BaseModel): title: str = Field(min_length=1, max_length=200) description: str = Field(min_length=1, max_length=5000) - release_date: Optional[date] = None - demo_date: Optional[date] = None - project_type: str = Field(pattern=r"^(game|game_asset|software|mobile_app|website)$") + release_date: str = "" + demo_date: str = "" + project_type: Literal["game", "game_asset", "software", "mobile_app", "website"] = "software" platforms: str = Field(default="", max_length=500) + status: str = Field(default="In Development", max_length=100) + attachment_uids: list[str] = [] -class MessageCreate(BaseModel): +class MessageForm(BaseModel): content: str = Field(min_length=1, max_length=2000) - receiver_uid: str + receiver_uid: str = Field(min_length=1) + attachment_uids: list[str] = [] -class ProfileUpdate(BaseModel): - bio: Optional[str] = Field(default=None, max_length=500) - location: Optional[str] = Field(default=None, max_length=200) - git_link: Optional[str] = Field(default=None, max_length=500) - website: Optional[str] = Field(default=None, max_length=500) +class ProfileForm(BaseModel): + bio: str = Field(default="", max_length=500) + location: str = Field(default="", max_length=200) + git_link: str = Field(default="", max_length=500) + website: str = Field(default="", max_length=500) + + +class GistForm(BaseModel): + title: str = Field(min_length=1, max_length=200) + description: str = Field(default="", max_length=5000) + source_code: str = Field(min_length=1, max_length=50000) + language: str = Field(default="plaintext", max_length=50) + attachment_uids: list[str] = [] + + +class GistEditForm(BaseModel): + title: str = Field(min_length=1, max_length=200) + description: str = Field(default="", max_length=5000) + source_code: str = Field(min_length=1, max_length=50000) + language: str = Field(default="plaintext", max_length=50) + + +class BugForm(BaseModel): + title: str = Field(min_length=1, max_length=200) + description: str = Field(min_length=1, max_length=5000) + attachment_uids: list[str] = [] + + +class VoteForm(BaseModel): + value: int + + @field_validator("value") + @classmethod + def valid_value(cls, value): + if value not in (1, -1): + raise ValueError("value must be 1 or -1") + return value + + +class AdminRoleForm(BaseModel): + role: Literal["member", "admin"] + + +class AdminPasswordForm(BaseModel): + password: str = Field(min_length=6, max_length=128) diff --git a/devplacepy/routers/admin.py b/devplacepy/routers/admin.py index 3f5adbd..95729b7 100644 --- a/devplacepy/routers/admin.py +++ b/devplacepy/routers/admin.py @@ -1,6 +1,8 @@ import logging +from typing import Annotated from datetime import datetime -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form +from devplacepy.models import AdminRoleForm, AdminPasswordForm from fastapi.responses import HTMLResponse, RedirectResponse from devplacepy.database import get_table, db, build_pagination from devplacepy.templating import templates @@ -51,12 +53,9 @@ async def admin_users(request: Request, page: int = 1): @router.post("/users/{uid}/role") -async def admin_user_role(request: Request, uid: str): +async def admin_user_role(request: Request, uid: str, data: Annotated[AdminRoleForm, Form()]): admin = require_admin(request) - form = await request.form() - role = form.get("role", "").strip().capitalize() - if role not in ("Member", "Admin"): - return RedirectResponse(url="/admin/users", status_code=302) + role = data.role.capitalize() if uid == admin["uid"]: return RedirectResponse(url="/admin/users", status_code=302) users = get_table("users") @@ -66,14 +65,10 @@ async def admin_user_role(request: Request, uid: str): @router.post("/users/{uid}/password") -async def admin_user_password(request: Request, uid: str): +async def admin_user_password(request: Request, uid: str, data: Annotated[AdminPasswordForm, Form()]): admin = require_admin(request) - form = await request.form() - password = form.get("password", "") - if len(password) < 6: - return RedirectResponse(url="/admin/users", status_code=302) users = get_table("users") - users.update({"uid": uid, "password_hash": hash_password(password)}, ["uid"]) + users.update({"uid": uid, "password_hash": hash_password(data.password)}, ["uid"]) logger.info(f"Admin {admin['username']} changed password for user {uid}") return RedirectResponse(url="/admin/users", status_code=302) diff --git a/devplacepy/routers/auth.py b/devplacepy/routers/auth.py index e4a7427..79b77d7 100644 --- a/devplacepy/routers/auth.py +++ b/devplacepy/routers/auth.py @@ -1,13 +1,15 @@ import hashlib import secrets import logging +from typing import Annotated from datetime import datetime, timedelta, timezone -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form from fastapi.responses import RedirectResponse, HTMLResponse from devplacepy.database import get_table from devplacepy.templating import templates from devplacepy.utils import hash_password, verify_password, create_session, generate_uid, get_current_user from devplacepy.seo import base_seo_context +from devplacepy.models import SignupForm, LoginForm, ForgotPasswordForm, ResetPasswordForm logger = logging.getLogger(__name__) router = APIRouter() @@ -42,25 +44,12 @@ async def login_page(request: Request): @router.post("/signup") -async def signup(request: Request): - form = await request.form() - username = form.get("username", "").strip() - email = form.get("email", "").strip().lower() - password = form.get("password", "") - confirm_password = form.get("confirm_password", "") +async def signup(request: Request, data: Annotated[SignupForm, Form()]): + username = data.username + email = data.email.strip().lower() + password = data.password errors = [] - if len(username) < 3 or len(username) > 32: - errors.append("Username must be between 3 and 32 characters") - if not username.isascii() or not all(c.isalnum() or c in ("-", "_") for c in username): - errors.append("Username can only contain letters, numbers, hyphens, and underscores") - if not email or "@" not in email or len(email) > 255: - errors.append("Valid email is required") - if len(password) < 6: - errors.append("Password must be at least 6 characters") - if password != confirm_password: - errors.append("Passwords do not match") - users = get_table("users") if users.find_one(username=username): errors.append("Username already taken") @@ -109,18 +98,14 @@ async def signup(request: Request): @router.post("/login") -async def login(request: Request): - form = await request.form() - email = form.get("email", "").strip().lower() - password = form.get("password", "") - remember_me = form.get("remember_me") == "on" +async def login(request: Request, data: Annotated[LoginForm, Form()]): + email = data.email.strip().lower() + password = data.password + remember_me = data.remember_me == "on" errors = [] - if not email or not password: - errors.append("Email and password are required") - users = get_table("users") - user = users.find_one(email=email) if not errors else None + user = users.find_one(email=email) if not user or not verify_password(password, user["password_hash"]): errors.append("Invalid email or password") @@ -151,18 +136,9 @@ async def forgot_password_page(request: Request): @router.post("/forgot-password") -async def forgot_password(request: Request): - form = await request.form() - email = form.get("email", "").strip().lower() - errors = [] - if not email or "@" not in email: - errors.append("Valid email is required") - +async def forgot_password(request: Request, data: Annotated[ForgotPasswordForm, Form()]): + email = data.email.strip().lower() seo_ctx = base_seo_context(request, title="Reset Password", robots="noindex,nofollow") - if errors: - return templates.TemplateResponse(request, "forgot_password.html", { - **seo_ctx, "request": request, "errors": errors, - }) users = get_table("users") user = users.find_one(email=email) @@ -198,22 +174,9 @@ async def reset_password_page(request: Request, token: str): @router.post("/reset-password/{token}") -async def reset_password(request: Request, token: str): - form = await request.form() - password = form.get("password", "") - confirm = form.get("confirm_password", "") - +async def reset_password(request: Request, token: str, data: Annotated[ResetPasswordForm, Form()]): + password = data.password errors = [] - if len(password) < 6: - errors.append("Password must be at least 6 characters") - if password != confirm: - errors.append("Passwords do not match") - - if errors: - seo_ctx = base_seo_context(request, title="Set New Password", robots="noindex,nofollow") - return templates.TemplateResponse(request, "reset_password.html", { - **seo_ctx, "request": request, "token": token, "errors": errors, - }) resets = get_table("password_resets") users = get_table("users") diff --git a/devplacepy/routers/bugs.py b/devplacepy/routers/bugs.py index 3d8a09b..0e2727b 100644 --- a/devplacepy/routers/bugs.py +++ b/devplacepy/routers/bugs.py @@ -1,7 +1,9 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form from fastapi.responses import HTMLResponse, RedirectResponse +from devplacepy.models import BugForm from devplacepy.database import get_table, load_comments from devplacepy.attachments import get_attachments_batch, link_attachments from devplacepy.templating import templates @@ -52,14 +54,10 @@ async def bugs_page(request: Request): @router.post("/create") -async def create_bug(request: Request): +async def create_bug(request: Request, data: Annotated[BugForm, Form()]): user = require_user(request) - form = await request.form() - title = form.get("title", "").strip() - description = form.get("description", "").strip() - - if not title or not description: - return RedirectResponse(url="/bugs", status_code=302) + title = data.title.strip() + description = data.description.strip() bugs_table = get_table("bug_reports") bug_uid = generate_uid() @@ -72,9 +70,8 @@ async def create_bug(request: Request): "created_at": datetime.now(timezone.utc).isoformat(), }) - attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else [] - if attachment_uids: - link_attachments(attachment_uids, "bug", bug_uid) + if data.attachment_uids: + link_attachments(data.attachment_uids, "bug", bug_uid) create_mention_notifications(description, user["uid"], f"/bugs?highlight={bug_uid}") logger.info(f"Bug report created by {user['username']}: {title}") diff --git a/devplacepy/routers/comments.py b/devplacepy/routers/comments.py index 07e19de..0a280b2 100644 --- a/devplacepy/routers/comments.py +++ b/devplacepy/routers/comments.py @@ -1,11 +1,13 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form from fastapi.responses import RedirectResponse from devplacepy.database import get_table, resolve_by_slug from devplacepy.attachments import link_attachments, delete_target_attachments from devplacepy.templating import clear_unread_cache from devplacepy.utils import generate_uid, require_user, create_mention_notifications +from devplacepy.models import CommentForm logger = logging.getLogger(__name__) router = APIRouter() @@ -32,23 +34,15 @@ def resolve_target_redirect(target_type, target_uid): @router.post("/create") -async def create_comment(request: Request): +async def create_comment(request: Request, data: Annotated[CommentForm, Form()]): user = require_user(request) - form = await request.form() - content = form.get("content", "").strip() - target_uid = form.get("target_uid") or form.get("post_uid", "") - target_type = form.get("target_type", "post") - parent_uid = form.get("parent_uid", "") + content = data.content.strip() + target_uid = data.target_uid or data.post_uid + target_type = data.target_type + parent_uid = data.parent_uid redirect_url = resolve_target_redirect(target_type, target_uid) - if not content or not target_uid: - return RedirectResponse(url=redirect_url, status_code=302) - if len(content) < 3: - return RedirectResponse(url=redirect_url, status_code=302) - if len(content) > 1000: - return RedirectResponse(url=redirect_url, status_code=302) - comment_uid = generate_uid() insert = { "uid": comment_uid, @@ -63,9 +57,8 @@ async def create_comment(request: Request): insert["post_uid"] = target_uid get_table("comments").insert(insert) - attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else [] - if attachment_uids: - link_attachments(attachment_uids, "comment", comment_uid) + if data.attachment_uids: + link_attachments(data.attachment_uids, "comment", comment_uid) badges = get_table("badges") existing = badges.find_one(user_uid=user["uid"], badge_name="First Comment") diff --git a/devplacepy/routers/gists.py b/devplacepy/routers/gists.py index 616695c..95b2100 100644 --- a/devplacepy/routers/gists.py +++ b/devplacepy/routers/gists.py @@ -1,6 +1,8 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request, HTTPException +from fastapi import APIRouter, Request, HTTPException, Form +from devplacepy.models import GistForm, GistEditForm from fastapi.responses import HTMLResponse, RedirectResponse from devplacepy.database import get_table, load_comments, get_vote_counts, resolve_by_slug, db from devplacepy.attachments import get_attachments, delete_target_attachments @@ -127,24 +129,12 @@ async def gist_detail(request: Request, gist_slug: str): @router.post("/create") -async def create_gist(request: Request): +async def create_gist(request: Request, data: Annotated[GistForm, Form()]): user = require_user(request) - form = await request.form() - title = form.get("title", "").strip() - description = form.get("description", "").strip() - source_code = form.get("source_code", "").strip() - language = form.get("language", "plaintext") - - if not title: - return RedirectResponse(url="/gists", status_code=302) - if len(title) > 200: - return RedirectResponse(url="/gists", status_code=302) - if not source_code: - return RedirectResponse(url="/gists", status_code=302) - if len(source_code) > 50000: - return RedirectResponse(url="/gists", status_code=302) - if len(description) > 5000: - return RedirectResponse(url="/gists", status_code=302) + title = data.title.strip() + description = data.description.strip() + source_code = data.source_code.strip() + language = data.language valid_languages = {l[0] for l in LANGUAGES} if language not in valid_languages: @@ -165,10 +155,8 @@ async def create_gist(request: Request): "created_at": datetime.now(timezone.utc).isoformat(), }) - attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else [] - from devplacepy.attachments import link_attachments - link_attachments(attachment_uids, "gist", uid) + link_attachments(data.attachment_uids, "gist", uid) create_mention_notifications(description or "", user["uid"], f"/gists/{gist_slug}") @@ -177,25 +165,17 @@ async def create_gist(request: Request): @router.post("/edit/{gist_slug}") -async def edit_gist(request: Request, gist_slug: str): +async def edit_gist(request: Request, gist_slug: str, data: Annotated[GistEditForm, Form()]): user = require_user(request) gists = get_table("gists") gist = resolve_by_slug(gists, gist_slug) if not gist or gist["user_uid"] != user["uid"]: return RedirectResponse(url="/gists", status_code=302) - form = await request.form() - title = form.get("title", "").strip() - description = form.get("description", "").strip() - source_code = form.get("source_code", "").strip() - language = form.get("language", "plaintext") - - if not title or len(title) > 200: - return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302) - if not source_code or len(source_code) > 50000: - return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302) - if len(description) > 5000: - return RedirectResponse(url=f"/gists/{gist['slug'] or gist['uid']}", status_code=302) + title = data.title.strip() + description = data.description.strip() + source_code = data.source_code.strip() + language = data.language valid_languages = {l[0] for l in LANGUAGES} if language not in valid_languages: diff --git a/devplacepy/routers/messages.py b/devplacepy/routers/messages.py index aa6fcdb..156f6b0 100644 --- a/devplacepy/routers/messages.py +++ b/devplacepy/routers/messages.py @@ -1,6 +1,8 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form +from devplacepy.models import MessageForm from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from devplacepy.database import get_table, db from devplacepy.attachments import get_attachments_batch @@ -145,43 +147,39 @@ async def search_users(request: Request, q: str = ""): @router.post("/send") -async def send_message(request: Request): +async def send_message(request: Request, data: Annotated[MessageForm, Form()]): user = require_user(request) - form = await request.form() - content = form.get("content", "").strip() - receiver_uid = form.get("receiver_uid", "") + content = data.content.strip() + receiver_uid = data.receiver_uid - if content and receiver_uid: - messages_table = get_table("messages") - msg_uid = generate_uid() - messages_table.insert({ - "uid": msg_uid, - "sender_uid": user["uid"], - "receiver_uid": receiver_uid, - "content": content, + messages_table = get_table("messages") + msg_uid = generate_uid() + messages_table.insert({ + "uid": msg_uid, + "sender_uid": user["uid"], + "receiver_uid": receiver_uid, + "content": content, + "read": False, + "created_at": datetime.now(timezone.utc).isoformat(), + }) + + from devplacepy.attachments import link_attachments + link_attachments(data.attachment_uids, "message", msg_uid) + + if user["uid"] != receiver_uid: + notifications = get_table("notifications") + notifications.insert({ + "uid": generate_uid(), + "user_uid": receiver_uid, + "type": "message", + "message": f"{user['username']} sent you a message", + "related_uid": user["uid"], "read": False, "created_at": datetime.now(timezone.utc).isoformat(), }) + clear_unread_cache(receiver_uid) - attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else [] + create_mention_notifications(content, user["uid"], f"/messages?with_uid={receiver_uid}") - from devplacepy.attachments import link_attachments - link_attachments(attachment_uids, "message", msg_uid) - - if user["uid"] != receiver_uid: - notifications = get_table("notifications") - notifications.insert({ - "uid": generate_uid(), - "user_uid": receiver_uid, - "type": "message", - "message": f"{user['username']} sent you a message", - "related_uid": user["uid"], - "read": False, - "created_at": datetime.now(timezone.utc).isoformat(), - }) - clear_unread_cache(receiver_uid) - - create_mention_notifications(content, user["uid"], f"/messages?with_uid={receiver_uid}") - - logger.info(f"Message {msg_uid} sent from {user['username']} to {receiver_uid}") - return RedirectResponse(url=f"/messages?with_uid={receiver_uid}", status_code=302) + logger.info(f"Message {msg_uid} sent from {user['username']} to {receiver_uid}") + return RedirectResponse(url=f"/messages?with_uid={receiver_uid}", status_code=302) diff --git a/devplacepy/routers/posts.py b/devplacepy/routers/posts.py index deaed7d..87d450f 100644 --- a/devplacepy/routers/posts.py +++ b/devplacepy/routers/posts.py @@ -1,6 +1,7 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request, HTTPException +from fastapi import APIRouter, Request, HTTPException, Form from fastapi.responses import RedirectResponse, HTMLResponse from devplacepy.constants import TOPICS from devplacepy.database import get_table, get_comment_counts_by_post_uids, load_comments, db, resolve_by_slug @@ -8,36 +9,24 @@ from devplacepy.templating import templates from devplacepy.utils import generate_uid, get_current_user, require_user, time_ago, make_combined_slug, create_mention_notifications from devplacepy.seo import base_seo_context, site_url, website_schema, breadcrumb_schema, discussion_forum_posting, combine, truncate from devplacepy.attachments import get_attachments, link_attachments, delete_target_attachments, save_inline_image, delete_inline_image +from devplacepy.models import PostForm, PostEditForm logger = logging.getLogger(__name__) router = APIRouter() @router.post("/create") -async def create_post(request: Request): +async def create_post(request: Request, data: Annotated[PostForm, Form()]): user = require_user(request) - form = await request.form() - content = form.get("content", "").strip() - title = form.get("title", "").strip() - topic = form.get("topic", "random") - project_uid = form.get("project_uid", "") - - errors = [] - if not content: - errors.append("Content is required") - if content and len(content) < 10: - errors.append("Content must be at least 10 characters") - if len(content) > 2000: - errors.append("Content too long (max 2000 characters)") - if topic not in TOPICS: - topic = "random" - - if errors: - return RedirectResponse(url="/feed", status_code=302) + content = data.content.strip() + title = data.title.strip() + topic = data.topic + project_uid = data.project_uid image_filename = None + form = await request.form() image_file = form.get("image") - if image_file and hasattr(image_file, "filename") and image_file.filename: + if image_file is not None and hasattr(image_file, "filename") and image_file.filename: image_filename = save_inline_image(await image_file.read(), image_file.filename) if image_filename: content += f"\n\n![](/static/uploads/{image_filename})" @@ -69,9 +58,8 @@ async def create_post(request: Request): "created_at": datetime.now(timezone.utc).isoformat(), }) - attachment_uids = form.getlist("attachment_uids") - if attachment_uids: - link_attachments(attachment_uids, "post", uid) + if data.attachment_uids: + link_attachments(data.attachment_uids, "post", uid) create_mention_notifications(content, user["uid"], f"/posts/{post_slug}") logger.info(f"Post {uid} created by {user['username']}") @@ -146,36 +134,18 @@ async def view_post(request: Request, post_slug: str): @router.post("/edit/{post_slug}") -async def edit_post(request: Request, post_slug: str): +async def edit_post(request: Request, post_slug: str, data: Annotated[PostEditForm, Form()]): user = require_user(request) - form = await request.form() - content = form.get("content", "").strip() - title = form.get("title", "").strip() - topic = form.get("topic", "random") - - errors = [] - if not content: - errors.append("Content is required") - if content and len(content) < 10: - errors.append("Content must be at least 10 characters") - if len(content) > 2000: - errors.append("Content too long (max 2000 characters)") - if topic not in TOPICS: - topic = "random" - posts = get_table("posts") post = resolve_by_slug(posts, post_slug) if not post or post["user_uid"] != user["uid"]: return RedirectResponse(url="/feed", status_code=302) - if errors: - return RedirectResponse(url=f"/posts/{post['slug'] or post['uid']}", status_code=302) - posts.update({ "uid": post["uid"], - "content": content, - "title": title or None, - "topic": topic, + "content": data.content.strip(), + "title": data.title.strip() or None, + "topic": data.topic, }, ["uid"]) logger.info(f"Post {post['uid']} edited by {user['username']}") diff --git a/devplacepy/routers/profile.py b/devplacepy/routers/profile.py index 1219d34..6a886d1 100644 --- a/devplacepy/routers/profile.py +++ b/devplacepy/routers/profile.py @@ -1,5 +1,7 @@ import logging -from fastapi import APIRouter, Request +from typing import Annotated +from fastapi import APIRouter, Request, Form +from devplacepy.models import ProfileForm from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from devplacepy.database import get_table, db from devplacepy.templating import templates @@ -10,6 +12,22 @@ logger = logging.getLogger(__name__) router = APIRouter() +@router.get("/search") +async def search_users(request: Request, q: str = ""): + require_user(request) + if not q or len(q) < 1: + return JSONResponse({"results": []}) + if "users" in db.tables: + rows = db.query( + "SELECT uid, username FROM users WHERE username LIKE :q LIMIT 10", + q=f"%{q}%", + ) + results = [{"uid": r["uid"], "username": r["username"]} for r in rows] + else: + results = [] + return JSONResponse({"results": results}) + + @router.get("/{username}", response_class=HTMLResponse) async def profile_page(request: Request, username: str, tab: str = "posts"): current_user = get_current_user(request) @@ -95,37 +113,16 @@ async def profile_page(request: Request, username: str, tab: str = "posts"): }) -@router.get("/search") -async def search_users(request: Request, q: str = ""): - require_user(request) - if not q or len(q) < 1: - return JSONResponse({"results": []}) - if "users" in db.tables: - rows = db.query( - "SELECT uid, username FROM users WHERE username LIKE :q LIMIT 10", - q=f"%{q}%", - ) - results = [{"uid": r["uid"], "username": r["username"]} for r in rows] - else: - results = [] - return JSONResponse({"results": results}) - - @router.post("/update") -async def update_profile(request: Request): +async def update_profile(request: Request, data: Annotated[ProfileForm, Form()]): user = require_user(request) - form = await request.form() - bio = form.get("bio", "").strip() - location = form.get("location", "").strip() - git_link = form.get("git_link", "").strip() - website = form.get("website", "").strip() users = get_table("users") users.update({ "uid": user["uid"], - "bio": bio, - "location": location, - "git_link": git_link, - "website": website, + "bio": data.bio.strip(), + "location": data.location.strip(), + "git_link": data.git_link.strip(), + "website": data.website.strip(), }, ["uid"]) logger.info(f"Profile updated for {user['username']}") diff --git a/devplacepy/routers/projects.py b/devplacepy/routers/projects.py index 2a091ec..6a817b9 100644 --- a/devplacepy/routers/projects.py +++ b/devplacepy/routers/projects.py @@ -1,6 +1,8 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request, HTTPException +from fastapi import APIRouter, Request, HTTPException, Form +from devplacepy.models import ProjectForm from fastapi.responses import HTMLResponse, RedirectResponse from devplacepy.database import get_table, get_vote_counts, load_comments, resolve_by_slug from devplacepy.attachments import link_attachments, get_attachments, delete_target_attachments @@ -147,19 +149,10 @@ async def delete_project(request: Request, project_slug: str): @router.post("/create") -async def create_project(request: Request): +async def create_project(request: Request, data: Annotated[ProjectForm, Form()]): user = require_user(request) - form = await request.form() - title = form.get("title", "").strip() - description = form.get("description", "").strip() - release_date = form.get("release_date", "") - demo_date = form.get("demo_date", "") - project_type = form.get("project_type", "software") - platforms = form.get("platforms", "").strip() - status = form.get("status", "In Development") - - if not title or not description: - return RedirectResponse(url="/projects", status_code=302) + title = data.title.strip() + description = data.description.strip() projects = get_table("projects") uid = generate_uid() @@ -170,18 +163,17 @@ async def create_project(request: Request): "title": title, "slug": project_slug, "description": description, - "release_date": release_date or None, - "demo_date": demo_date or None, - "project_type": project_type, - "platforms": platforms, - "status": status, + "release_date": data.release_date or None, + "demo_date": data.demo_date or None, + "project_type": data.project_type, + "platforms": data.platforms.strip(), + "status": data.status, "stars": 0, "created_at": datetime.now(timezone.utc).isoformat(), }) - attachment_uids = form.getlist("attachment_uids") if hasattr(form, "getlist") else [] - if attachment_uids: - link_attachments(attachment_uids, "project", uid) + if data.attachment_uids: + link_attachments(data.attachment_uids, "project", uid) create_mention_notifications(description, user["uid"], f"/projects/{project_slug}") logger.info(f"Project {uid} created by {user['username']}") diff --git a/devplacepy/routers/votes.py b/devplacepy/routers/votes.py index 6918f9a..470dc1a 100644 --- a/devplacepy/routers/votes.py +++ b/devplacepy/routers/votes.py @@ -1,23 +1,21 @@ import logging +from typing import Annotated from datetime import datetime, timezone -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Form from fastapi.responses import RedirectResponse from devplacepy.database import get_table from devplacepy.templating import clear_unread_cache from devplacepy.utils import generate_uid, require_user +from devplacepy.models import VoteForm logger = logging.getLogger(__name__) router = APIRouter() @router.post("/{target_type}/{target_uid}") -async def vote(request: Request, target_type: str, target_uid: str): +async def vote(request: Request, target_type: str, target_uid: str, data: Annotated[VoteForm, Form()]): user = require_user(request) - form = await request.form() - value = int(form.get("value", "1")) - - if value not in (1, -1): - return RedirectResponse(url="/feed", status_code=302) + value = data.value votes = get_table("votes") existing = votes.find_one(user_uid=user["uid"], target_uid=target_uid, target_type=target_type) diff --git a/tests/test_admin.py b/tests/test_admin.py index 3f841bc..ef2d45d 100644 --- a/tests/test_admin.py +++ b/tests/test_admin.py @@ -129,3 +129,59 @@ def test_admin_news_pagination(alice): next_btn.click() page.wait_for_url(f"{BASE_URL}/admin/news?page=2", wait_until="domcontentloaded") assert "Page 2" in page.text_content(".pagination-info") + + +def _seed_target_user(): + uid = str(uuid4()) + get_table("users").insert({ + "uid": uid, "username": f"target_{uid[:8]}", "email": f"target_{uid[:8]}@test.devplace", + "password_hash": "x", "role": "Member", "is_active": True, + "created_at": datetime.now(timezone.utc).isoformat(), + }) + return uid + + +def test_admin_settings_save(alice): + page, _ = alice + page.goto(f"{BASE_URL}/admin/settings", wait_until="domcontentloaded") + assert page.is_visible("#site_name") + newval = f"model-{uuid4().hex[:8]}" + page.fill("#news_ai_model", newval) + page.click("button:has-text('Save Settings')") + page.wait_for_url("**/admin/settings", wait_until="domcontentloaded") + assert page.locator("#news_ai_model").input_value() == newval + + +def test_admin_change_user_role(alice): + page, _ = alice + uid = _seed_target_user() + page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded") + sel = f"form[action='/admin/users/{uid}/role'] select[name='role']" + page.locator(sel).wait_for(state="visible") + page.select_option(sel, "admin") + page.wait_for_url("**/admin/users**", wait_until="domcontentloaded") + page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded") + assert page.locator(sel).input_value() == "admin" + + +def test_admin_toggle_user_active(alice): + page, _ = alice + uid = _seed_target_user() + page.goto(f"{BASE_URL}/admin/users", wait_until="domcontentloaded") + btn = f"form[action='/admin/users/{uid}/toggle'] button" + before = page.locator(btn).inner_text() + page.locator(btn).click() + page.wait_for_url("**/admin/users**", wait_until="domcontentloaded") + assert page.locator(btn).inner_text() != before + + +def test_admin_news_featured_toggle(alice): + page, _ = alice + seed_admin_news() + page.goto(f"{BASE_URL}/admin/news", wait_until="domcontentloaded") + toggle_sel = "form[action$='/toggle'] button.admin-toggle-switch" + was_active = "active" in (page.locator(toggle_sel).first.get_attribute("class") or "") + page.locator(toggle_sel).first.click() + page.wait_for_url(f"{BASE_URL}/admin/news", wait_until="domcontentloaded") + is_active = "active" in (page.locator(toggle_sel).first.get_attribute("class") or "") + assert is_active != was_active diff --git a/tests/test_auth.py b/tests/test_auth.py index b60dfce..4a2529c 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,4 +1,66 @@ +import hashlib +import time +from datetime import datetime, timedelta, timezone from tests.conftest import BASE_URL +from devplacepy.database import get_table +from devplacepy.utils import hash_password, generate_uid + + +def test_forgot_password_page_loads(page, app_server): + page.goto(f"{BASE_URL}/auth/forgot-password", wait_until="domcontentloaded") + assert page.is_visible("input#email") + assert page.is_visible("button:has-text('Send Reset Link')") + + +def test_forgot_password_submit_shows_sent(page, app_server): + page.goto(f"{BASE_URL}/auth/forgot-password", wait_until="domcontentloaded") + page.fill("#email", "anybody@example.com") + page.click("button:has-text('Send Reset Link')") + page.locator(".auth-success").wait_for(state="visible") + + +def test_reset_password_page_loads(page, app_server): + page.goto(f"{BASE_URL}/auth/reset-password/sometoken", wait_until="domcontentloaded") + assert page.is_visible("input#password") + assert page.is_visible("input#confirm_password") + + +def test_reset_password_mismatch_shows_error(page, app_server): + page.goto(f"{BASE_URL}/auth/reset-password/sometoken", wait_until="domcontentloaded") + page.fill("#password", "abcdef1") + page.fill("#confirm_password", "different1") + page.click("button:has-text('Reset Password')") + page.locator(".auth-error").wait_for(state="visible") + + +def test_reset_password_full_flow(page, app_server): + uname = f"reset_{int(time.time() * 1000)}" + email = f"{uname}@t.dev" + uid = generate_uid() + get_table("users").insert({ + "uid": uid, "username": uname, "email": email, + "password_hash": hash_password("oldpass123"), + "bio": "", "location": "", "git_link": "", "website": "", + "role": "Member", "is_active": True, "level": 1, "xp": 0, "stars": 0, + "created_at": datetime.now(timezone.utc).isoformat(), + }) + token = f"tok{uid[:8]}" + get_table("password_resets").insert({ + "uid": generate_uid(), "user_uid": uid, + "token": hashlib.sha256(token.encode()).hexdigest(), + "expires_at": (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat(), + "used": False, "created_at": datetime.now(timezone.utc).isoformat(), + }) + page.goto(f"{BASE_URL}/auth/reset-password/{token}", wait_until="domcontentloaded") + page.fill("#password", "newpass456") + page.fill("#confirm_password", "newpass456") + page.click("button:has-text('Reset Password')") + page.wait_for_url("**/auth/login", wait_until="domcontentloaded") + page.fill("#email", email) + page.fill("#password", "newpass456") + page.click("button:has-text('Sign in')") + page.wait_for_url("**/feed", wait_until="domcontentloaded") + assert "/feed" in page.url def test_signup_page_loads(page, app_server): diff --git a/tests/test_avatar.py b/tests/test_avatar.py index ad89efc..7f507c7 100644 --- a/tests/test_avatar.py +++ b/tests/test_avatar.py @@ -1,4 +1,13 @@ +import requests from devplacepy.avatar import avatar_url, generate_avatar_svg +from tests.conftest import BASE_URL + + +def test_avatar_endpoint_serves_svg(app_server): + r = requests.get(f"{BASE_URL}/avatar/multiavatar/alice_test?size=64") + assert r.status_code == 200 + assert "svg" in r.headers.get("content-type", "").lower() + assert "= 1 + + +def test_mention_autocomplete(alice): + page, _ = alice + create_post(page, "random", "Post for mention autocomplete here") + ta = page.locator(".comment-form textarea[data-mention]").first + ta.click() + ta.press_sequentially("@bob") + item = page.locator(".mention-dropdown-item").first + item.wait_for(state="visible") + assert "bob" in item.inner_text().lower() + + +def test_emoji_picker_opens(alice): + page, _ = alice + create_post(page, "random", "Post for emoji picker here") + page.locator(".emoji-toggle-btn").first.click() + page.locator(".emoji-picker-wrapper").first.wait_for(state="visible") + assert page.locator("emoji-picker").first.is_visible() + + +def test_attachment_upload_ui(alice): + import io + from PIL import Image + page, _ = alice + create_post(page, "random", "Post for attachment upload UI") + buf = io.BytesIO() + Image.new("RGB", (4, 4), (0, 128, 255)).save(buf, "PNG") + file_input = page.locator(".comment-form .attachment-upload-container input[type='file']").first + file_input.set_input_files({"name": "pic.png", "mimeType": "image/png", "buffer": buf.getvalue()}) + page.locator(".attachment-preview[data-uid]").first.wait_for(state="visible", timeout=15000) diff --git a/tests/test_profile.py b/tests/test_profile.py index aaf0b8d..9a921f8 100644 --- a/tests/test_profile.py +++ b/tests/test_profile.py @@ -1,6 +1,14 @@ from tests.conftest import BASE_URL +def test_profile_search_returns_results(alice): + page, _ = alice + resp = page.request.get(f"{BASE_URL}/profile/search?q=bob") + assert resp.status == 200 + data = resp.json() + assert any("bob" in u["username"] for u in data["results"]) + + def test_profile_page_loads(alice): page, user = alice page.goto(f"{BASE_URL}/profile/{user['username']}", wait_until="domcontentloaded") diff --git a/tests/test_projects.py b/tests/test_projects.py index 3a23f0f..ffcaf30 100644 --- a/tests/test_projects.py +++ b/tests/test_projects.py @@ -1,6 +1,41 @@ from tests.conftest import BASE_URL, assert_share_copies +def _create_project(page, title): + page.goto(f"{BASE_URL}/projects", wait_until="domcontentloaded") + page.locator("#create-project-btn").click() + page.fill("#title", title) + page.fill("#description", "Project for testing") + page.fill("#release_date", "2026-06-01") + page.check("input[value='game']") + page.locator("#platforms-input").fill("PC") + page.locator("#platforms-input").press("Enter") + page.click("button:has-text('Create Project')") + page.wait_for_url(f"{BASE_URL}/projects/*", wait_until="domcontentloaded") + + +def test_project_vote(alice): + page, _ = alice + _create_project(page, "Votable Project") + star = "form[action*='/votes/project/'] button" + before = int(page.locator(star).first.inner_text().strip("☆ ")) + page.locator(star).first.click() + page.wait_for_url(f"{BASE_URL}/projects/*", wait_until="domcontentloaded") + after = int(page.locator(star).first.inner_text().strip("☆ ")) + assert after == before + 1 + + +def test_delete_own_project(alice): + page, _ = alice + _create_project(page, "Deletable Project XYZ") + proj_url = page.url + page.once("dialog", lambda d: d.accept()) + page.locator("form[action*='/projects/delete/'] button").click() + page.wait_for_url(f"{BASE_URL}/projects", wait_until="domcontentloaded") + resp = page.goto(proj_url, wait_until="domcontentloaded") + assert resp.status == 404 + + def test_project_detail_share_button(alice): page, _ = alice page.goto(f"{BASE_URL}/projects", wait_until="domcontentloaded")