235 lines
8.4 KiB
Python
Raw Normal View History

2026-05-10 09:08:12 +02:00
import logging
2026-05-11 00:41:41 +02:00
import aiofiles
2026-05-10 09:08:12 +02:00
from datetime import datetime
2026-05-11 00:41:41 +02:00
from pathlib import Path
2026-05-11 03:14:43 +02:00
from fastapi import APIRouter, Request, HTTPException
2026-05-10 09:08:12 +02:00
from fastapi.responses import RedirectResponse, HTMLResponse
2026-05-11 05:30:51 +02:00
from devplacepy.database import get_table, get_comment_counts_by_post_uids, db
2026-05-10 09:08:12 +02:00
from devplacepy.templating import templates
2026-05-11 00:41:41 +02:00
from devplacepy.config import STATIC_DIR
2026-05-11 05:30:51 +02:00
from devplacepy.utils import generate_uid, get_current_user, require_user, time_ago, slugify
from devplacepy.seo import base_seo_context, site_url, website_schema, breadcrumb_schema, discussion_forum_posting, combine, truncate
2026-05-10 09:08:12 +02:00
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/create")
async def create_post(request: Request):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
title = form.get("title", "").strip()
topic = form.get("topic", "random")
project_uid = form.get("project_uid", "")
errors = []
if not content:
errors.append("Content is required")
2026-05-11 05:30:51 +02:00
if content and len(content) < 10:
errors.append("Content must be at least 10 characters")
2026-05-10 09:08:12 +02:00
if len(content) > 2000:
errors.append("Content too long (max 2000 characters)")
2026-05-11 07:02:06 +02:00
if topic not in ("devlog", "showcase", "question", "rant", "fun", "random", "signals"):
2026-05-10 09:08:12 +02:00
topic = "random"
if errors:
return RedirectResponse(url="/feed", status_code=302)
2026-05-11 00:41:41 +02:00
image_filename = None
image_file = form.get("image")
if image_file and hasattr(image_file, "filename") and image_file.filename:
try:
content_bytes = await image_file.read()
2026-05-11 03:14:43 +02:00
if len(content_bytes) > 5 * 1024 * 1024:
logger.warning(f"Image too large: {image_file.filename}")
else:
import imghdr
ext = Path(image_file.filename).suffix.lower()
allowed = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}
if ext not in allowed:
logger.warning(f"Unsupported image type: {ext}")
else:
upload_dir = STATIC_DIR / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
image_filename = f"{generate_uid()}{ext}"
file_path = upload_dir / image_filename
async with aiofiles.open(str(file_path), "wb") as f:
await f.write(content_bytes)
logger.info(f"Image saved: {image_filename}")
2026-05-11 05:30:51 +02:00
content += f"\n\n![](/static/uploads/{image_filename})"
2026-05-11 00:41:41 +02:00
except Exception as e:
logger.warning(f"Image upload failed: {e}")
2026-05-10 09:08:12 +02:00
posts = get_table("posts")
uid = generate_uid()
2026-05-11 05:30:51 +02:00
post_slug = slugify(title) if title else slugify(content[:50])
2026-05-10 09:08:12 +02:00
posts.insert({
"uid": uid,
"user_uid": user["uid"],
"title": title or None,
2026-05-11 05:30:51 +02:00
"slug": post_slug,
2026-05-10 09:08:12 +02:00
"content": content,
"topic": topic,
"project_uid": project_uid or None,
2026-05-11 00:41:41 +02:00
"image": image_filename,
2026-05-10 09:08:12 +02:00
"stars": 0,
"created_at": datetime.utcnow().isoformat(),
})
2026-05-11 00:41:41 +02:00
badges = get_table("badges")
existing = badges.find_one(user_uid=user["uid"], badge_name="First Post")
if not existing:
badges.insert({
"uid": generate_uid(),
"user_uid": user["uid"],
"badge_name": "First Post",
"created_at": datetime.utcnow().isoformat(),
})
2026-05-10 09:08:12 +02:00
logger.info(f"Post {uid} created by {user['username']}")
return RedirectResponse(url=f"/posts/{uid}", status_code=302)
@router.get("/{post_uid}", response_class=HTMLResponse)
async def view_post(request: Request, post_uid: str):
user = get_current_user(request)
posts = get_table("posts")
post = posts.find_one(uid=post_uid)
2026-05-11 05:30:51 +02:00
if not post:
post = posts.find_one(slug=post_uid)
2026-05-10 09:08:12 +02:00
if not post:
raise HTTPException(status_code=404, detail="Post not found")
users_table = get_table("users")
author = users_table.find_one(uid=post["user_uid"])
comments_table = get_table("comments")
raw_comments = list(comments_table.find(post_uid=post_uid, order_by=["created_at"]))
2026-05-11 03:14:43 +02:00
if raw_comments:
uids = [c["user_uid"] for c in raw_comments]
cids = [c["uid"] for c in raw_comments]
from devplacepy.database import get_users_by_uids, get_vote_counts
comment_users = get_users_by_uids(uids)
ups, downs = get_vote_counts(cids)
else:
comment_users, ups, downs = {}, {}, {}
2026-05-10 21:33:53 +02:00
comment_map = {}
2026-05-10 09:08:12 +02:00
for c in raw_comments:
2026-05-10 21:33:53 +02:00
comment_map[c["uid"]] = {
2026-05-10 09:08:12 +02:00
"comment": c,
2026-05-11 03:14:43 +02:00
"author": comment_users.get(c["user_uid"]),
2026-05-10 09:08:12 +02:00
"time_ago": time_ago(c["created_at"]),
"votes": {
2026-05-11 03:14:43 +02:00
"up": ups.get(c["uid"], 0),
"down": downs.get(c["uid"], 0),
2026-05-10 09:08:12 +02:00
},
2026-05-10 21:33:53 +02:00
"children": [],
}
top_level = []
for item in comment_map.values():
parent_uid = item["comment"].get("parent_uid")
if parent_uid and parent_uid in comment_map:
comment_map[parent_uid]["children"].append(item)
else:
top_level.append(item)
2026-05-10 09:08:12 +02:00
2026-05-11 05:30:51 +02:00
comment_count = len(raw_comments)
star_count = post.get("stars", 0)
base = site_url(request)
seo_ctx = base_seo_context(
request,
title=post.get("title") or "Post",
description=truncate(post.get("content", ""), 160),
breadcrumbs=[
{"name": "Home", "url": "/feed"},
{"name": "Feed", "url": "/feed"},
{"name": post.get("title") or "Post", "url": f"/posts/{post['uid']}"},
],
og_type="article",
schemas=[
website_schema(base),
discussion_forum_posting(post, author, comment_count, star_count, base),
],
)
related_posts = []
if "posts" in db.tables:
rows = list(db.query(
"SELECT p.*, u.username FROM posts p JOIN users u ON p.user_uid = u.uid WHERE p.topic = :t AND p.uid != :uid ORDER BY p.created_at DESC LIMIT 5",
t=post.get("topic", ""), uid=post_uid,
))
for r in rows:
related_posts.append({
"post": r,
"author": {"username": r["username"]},
"time_ago": time_ago(r["created_at"]),
})
2026-05-11 07:02:06 +02:00
topics = ["devlog", "showcase", "question", "rant", "fun", "random", "signals"]
2026-05-10 09:08:12 +02:00
return templates.TemplateResponse("post.html", {
2026-05-11 05:30:51 +02:00
**seo_ctx,
2026-05-10 09:08:12 +02:00
"request": request,
"user": user,
"post": post,
"author": author,
2026-05-10 21:33:53 +02:00
"comments": top_level,
2026-05-10 09:08:12 +02:00
"time_ago": time_ago(post["created_at"]),
2026-05-11 05:30:51 +02:00
"comment_count": comment_count,
"related_posts": related_posts,
2026-05-11 07:02:06 +02:00
"topics": topics,
2026-05-10 09:08:12 +02:00
})
2026-05-11 05:30:51 +02:00
2026-05-11 07:02:06 +02:00
@router.post("/edit/{post_uid}")
async def edit_post(request: Request, post_uid: str):
user = require_user(request)
form = await request.form()
content = form.get("content", "").strip()
title = form.get("title", "").strip()
topic = form.get("topic", "random")
errors = []
if not content:
errors.append("Content is required")
if content and len(content) < 10:
errors.append("Content must be at least 10 characters")
if len(content) > 2000:
errors.append("Content too long (max 2000 characters)")
if topic not in ("devlog", "showcase", "question", "rant", "fun", "random", "signals"):
topic = "random"
posts = get_table("posts")
post = posts.find_one(uid=post_uid)
if not post or post["user_uid"] != user["uid"]:
return RedirectResponse(url="/feed", status_code=302)
if errors:
return RedirectResponse(url=f"/posts/{post_uid}", status_code=302)
posts.update({
"uid": post_uid,
"content": content,
"title": title or None,
"topic": topic,
}, ["uid"])
logger.info(f"Post {post_uid} edited by {user['username']}")
return RedirectResponse(url=f"/posts/{post_uid}", status_code=302)
2026-05-11 05:30:51 +02:00
@router.post("/delete/{post_uid}")
async def delete_post(request: Request, post_uid: str):
user = require_user(request)
posts = get_table("posts")
post = posts.find_one(uid=post_uid)
if post and post["user_uid"] == user["uid"]:
get_table("comments").delete(post_uid=post_uid)
get_table("votes").delete(target_uid=post_uid)
posts.delete(id=post["id"])
logger.info(f"Post {post_uid} deleted by {user['username']}")
return RedirectResponse(url="/feed", status_code=302)