|
# retoor <retoor@molodetz.nl>
|
|
import logging
|
|
from typing import Any
|
|
from datetime import datetime, timezone
|
|
from fastapi.responses import RedirectResponse
|
|
from devplacepy.database import (
|
|
get_table,
|
|
resolve_by_slug,
|
|
get_users_by_uids,
|
|
get_vote_counts,
|
|
get_user_votes,
|
|
get_reactions_by_targets,
|
|
get_user_bookmarks,
|
|
get_poll_for_post,
|
|
delete_engagement,
|
|
load_comments,
|
|
db,
|
|
)
|
|
|
|
BOOKMARKABLE_TYPES = {"post", "gist", "project", "news"}
|
|
REACTABLE_TYPES = {"post", "comment", "gist", "project"}
|
|
from devplacepy.attachments import delete_attachments_for, delete_inline_image, get_attachments, link_attachments
|
|
from devplacepy.utils import time_ago, generate_uid, make_combined_slug, award_rewards, create_mention_notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_owner(item: dict | None, user: dict | None) -> bool:
|
|
return bool(item and user and item["user_uid"] == user["uid"])
|
|
|
|
|
|
def canonical_redirect(area: str, item: dict, requested: str) -> RedirectResponse | None:
|
|
canonical = item.get("slug") or item["uid"]
|
|
if requested == canonical:
|
|
return None
|
|
return RedirectResponse(url=f"/{area}/{canonical}", status_code=301)
|
|
|
|
|
|
def first_image_url(item: dict, attachments: list | None) -> str | None:
|
|
inline = item.get("image")
|
|
if inline:
|
|
return f"/static/uploads/{inline}"
|
|
for attachment in attachments or []:
|
|
if attachment.get("is_image"):
|
|
return attachment["url"]
|
|
return None
|
|
|
|
|
|
def create_content_item(table_name: str, target_type: str, user: dict, fields: dict, slug_source: str, xp: int, badge: str, mention_text: str, attachment_uids: list | None) -> tuple[str, str]:
|
|
uid = generate_uid()
|
|
slug = make_combined_slug(slug_source, uid)
|
|
get_table(table_name).insert({
|
|
"uid": uid,
|
|
"user_uid": user["uid"],
|
|
"slug": slug,
|
|
"stars": 0,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
**fields,
|
|
})
|
|
award_rewards(user["uid"], xp, badge)
|
|
if attachment_uids:
|
|
link_attachments(attachment_uids, target_type, uid)
|
|
create_mention_notifications(mention_text, user["uid"], f"/{table_name}/{slug}")
|
|
logger.info(f"{target_type} {uid} created by {user['username']}")
|
|
return uid, slug
|
|
|
|
|
|
def detail_context(request, user: dict | None, detail: dict, key: str, seo_ctx: dict, extra: dict | None = None) -> dict:
|
|
context = {
|
|
**seo_ctx,
|
|
"request": request,
|
|
"user": user,
|
|
key: detail["item"],
|
|
"author": detail["author"],
|
|
"is_owner": detail["is_owner"],
|
|
"star_count": detail["star_count"],
|
|
"my_vote": detail["my_vote"],
|
|
"time_ago": detail["time_ago"],
|
|
"comments": detail["comments"],
|
|
"attachments": detail["attachments"],
|
|
"reactions": detail.get("reactions", {"counts": {}, "mine": []}),
|
|
"bookmarked": detail.get("bookmarked", False),
|
|
"poll": detail.get("poll"),
|
|
}
|
|
if extra:
|
|
context.update(extra)
|
|
return context
|
|
|
|
|
|
def edit_content_item(table_name: str, user: dict, slug: str, update_fields: dict, redirect_fail: str) -> RedirectResponse:
|
|
table = get_table(table_name)
|
|
item = resolve_by_slug(table, slug)
|
|
if not is_owner(item, user):
|
|
return RedirectResponse(url=redirect_fail, status_code=302)
|
|
update_fields = {**update_fields, "updated_at": datetime.now(timezone.utc).isoformat()}
|
|
table.update({"uid": item["uid"], **update_fields}, ["uid"])
|
|
logger.info(f"{table_name} {item['uid']} edited by {user['username']}")
|
|
return RedirectResponse(url=f"/{table_name}/{item['slug'] or item['uid']}", status_code=302)
|
|
|
|
|
|
def delete_content_item(table_name: str, target_type: str, user: dict, slug: str, redirect_url: str, inline_image_field: str | None = None) -> RedirectResponse:
|
|
table = get_table(table_name)
|
|
item = resolve_by_slug(table, slug)
|
|
if is_owner(item, user):
|
|
delete_attachments_for(target_type, [item["uid"]])
|
|
comment_uids = []
|
|
if "comments" in db.tables:
|
|
comments = get_table("comments")
|
|
comment_uids = [comment["uid"] for comment in comments.find(target_uid=item["uid"])]
|
|
delete_attachments_for("comment", comment_uids)
|
|
comments.delete(target_uid=item["uid"])
|
|
if "votes" in db.tables:
|
|
votes = get_table("votes")
|
|
votes.delete(target_uid=item["uid"])
|
|
if comment_uids:
|
|
votes.delete(votes.table.columns.target_uid.in_(comment_uids), target_type="comment")
|
|
delete_engagement(target_type, [item["uid"]])
|
|
if comment_uids:
|
|
delete_engagement("comment", comment_uids)
|
|
if inline_image_field:
|
|
delete_inline_image(item.get(inline_image_field))
|
|
table.delete(id=item["id"])
|
|
logger.info(f"{table_name} {item['uid']} deleted by {user['username']}")
|
|
return RedirectResponse(url=redirect_url, status_code=302)
|
|
|
|
|
|
def load_detail(table_name: str, target_type: str, slug: str, user: dict | None) -> dict | None:
|
|
item = resolve_by_slug(get_table(table_name), slug)
|
|
if not item:
|
|
return None
|
|
author = get_users_by_uids([item["user_uid"]]).get(item["user_uid"])
|
|
ups, downs = get_vote_counts([item["uid"]])
|
|
reactions = get_reactions_by_targets(target_type, [item["uid"]], user).get(item["uid"], {"counts": {}, "mine": []}) if target_type in REACTABLE_TYPES else {"counts": {}, "mine": []}
|
|
bookmarked = bool(user) and target_type in BOOKMARKABLE_TYPES and item["uid"] in get_user_bookmarks(user["uid"], target_type, [item["uid"]])
|
|
return {
|
|
"item": item,
|
|
"author": author,
|
|
"is_owner": bool(user and user["uid"] == item["user_uid"]),
|
|
"star_count": ups.get(item["uid"], 0) - downs.get(item["uid"], 0),
|
|
"my_vote": get_user_votes(user["uid"], [item["uid"]]).get(item["uid"], 0) if user else 0,
|
|
"comments": load_comments(target_type, item["uid"], user),
|
|
"attachments": get_attachments(target_type, item["uid"]),
|
|
"time_ago": time_ago(item["created_at"]),
|
|
"reactions": reactions,
|
|
"bookmarked": bookmarked,
|
|
"poll": get_poll_for_post(item["uid"], user) if target_type == "post" else None,
|
|
}
|
|
|
|
|
|
def enrich_items(items: list, key: str, authors: dict, extra_maps: dict[str, Any] | None = None, ts_field: str = "created_at", user: dict | None = None) -> list:
|
|
extra_maps = extra_maps or {}
|
|
my_votes = get_user_votes(user["uid"], [item["uid"] for item in items]) if user else {}
|
|
enriched = []
|
|
for item in items:
|
|
entry = {
|
|
key: item,
|
|
"author": authors.get(item["user_uid"]),
|
|
"time_ago": time_ago(item[ts_field]),
|
|
"my_vote": my_votes.get(item["uid"], 0),
|
|
}
|
|
for name, source in extra_maps.items():
|
|
entry[name] = source(item) if callable(source) else source.get(item["uid"], 0)
|
|
enriched.append(entry)
|
|
return enriched
|