|
# retoor <retoor@molodetz.nl>
|
|
import logging
|
|
from typing import Any
|
|
from datetime import datetime, timezone
|
|
from fastapi.responses import RedirectResponse
|
|
from devplacepy.database import (
|
|
get_table,
|
|
resolve_by_slug,
|
|
get_users_by_uids,
|
|
get_vote_counts,
|
|
get_user_votes,
|
|
load_comments,
|
|
db,
|
|
)
|
|
from devplacepy.attachments import delete_target_attachments, delete_inline_image, get_attachments, link_attachments
|
|
from devplacepy.utils import time_ago, generate_uid, make_combined_slug, award_rewards, create_mention_notifications
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_owner(item: dict | None, user: dict | None) -> bool:
|
|
return bool(item and user and item["user_uid"] == user["uid"])
|
|
|
|
|
|
def create_content_item(table_name: str, target_type: str, user: dict, fields: dict, slug_source: str, xp: int, badge: str, mention_text: str, attachment_uids: list | None) -> tuple[str, str]:
|
|
uid = generate_uid()
|
|
slug = make_combined_slug(slug_source, uid)
|
|
get_table(table_name).insert({
|
|
"uid": uid,
|
|
"user_uid": user["uid"],
|
|
"slug": slug,
|
|
"stars": 0,
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
**fields,
|
|
})
|
|
award_rewards(user["uid"], xp, badge)
|
|
if attachment_uids:
|
|
link_attachments(attachment_uids, target_type, uid)
|
|
create_mention_notifications(mention_text, user["uid"], f"/{table_name}/{slug}")
|
|
logger.info(f"{target_type} {uid} created by {user['username']}")
|
|
return uid, slug
|
|
|
|
|
|
def detail_context(request, user: dict | None, detail: dict, key: str, seo_ctx: dict, extra: dict | None = None) -> dict:
|
|
context = {
|
|
**seo_ctx,
|
|
"request": request,
|
|
"user": user,
|
|
key: detail["item"],
|
|
"author": detail["author"],
|
|
"is_owner": detail["is_owner"],
|
|
"star_count": detail["star_count"],
|
|
"my_vote": detail["my_vote"],
|
|
"time_ago": detail["time_ago"],
|
|
"comments": detail["comments"],
|
|
"attachments": detail["attachments"],
|
|
}
|
|
if extra:
|
|
context.update(extra)
|
|
return context
|
|
|
|
|
|
def edit_content_item(table_name: str, user: dict, slug: str, update_fields: dict, redirect_fail: str) -> RedirectResponse:
|
|
table = get_table(table_name)
|
|
item = resolve_by_slug(table, slug)
|
|
if not is_owner(item, user):
|
|
return RedirectResponse(url=redirect_fail, status_code=302)
|
|
table.update({"uid": item["uid"], **update_fields}, ["uid"])
|
|
logger.info(f"{table_name} {item['uid']} edited by {user['username']}")
|
|
return RedirectResponse(url=f"/{table_name}/{item['slug'] or item['uid']}", status_code=302)
|
|
|
|
|
|
def delete_content_item(table_name: str, target_type: str, user: dict, slug: str, redirect_url: str, inline_image_field: str | None = None) -> RedirectResponse:
|
|
table = get_table(table_name)
|
|
item = resolve_by_slug(table, slug)
|
|
if is_owner(item, user):
|
|
delete_target_attachments(target_type, item["uid"])
|
|
if "comments" in db.tables:
|
|
comments = get_table("comments")
|
|
for comment in comments.find(target_uid=item["uid"]):
|
|
delete_target_attachments("comment", comment["uid"])
|
|
comments.delete(target_uid=item["uid"])
|
|
if "votes" in db.tables:
|
|
get_table("votes").delete(target_uid=item["uid"])
|
|
if inline_image_field:
|
|
delete_inline_image(item.get(inline_image_field))
|
|
table.delete(id=item["id"])
|
|
logger.info(f"{table_name} {item['uid']} deleted by {user['username']}")
|
|
return RedirectResponse(url=redirect_url, status_code=302)
|
|
|
|
|
|
def load_detail(table_name: str, target_type: str, slug: str, user: dict | None) -> dict | None:
|
|
item = resolve_by_slug(get_table(table_name), slug)
|
|
if not item:
|
|
return None
|
|
author = get_users_by_uids([item["user_uid"]]).get(item["user_uid"])
|
|
ups, downs = get_vote_counts([item["uid"]])
|
|
return {
|
|
"item": item,
|
|
"author": author,
|
|
"is_owner": bool(user and user["uid"] == item["user_uid"]),
|
|
"star_count": ups.get(item["uid"], 0) - downs.get(item["uid"], 0),
|
|
"my_vote": get_user_votes(user["uid"], [item["uid"]]).get(item["uid"], 0) if user else 0,
|
|
"comments": load_comments(target_type, item["uid"], user),
|
|
"attachments": get_attachments(target_type, item["uid"]),
|
|
"time_ago": time_ago(item["created_at"]),
|
|
}
|
|
|
|
|
|
def enrich_items(items: list, key: str, authors: dict, extra_maps: dict[str, Any] | None = None, ts_field: str = "created_at", user: dict | None = None) -> list:
|
|
extra_maps = extra_maps or {}
|
|
my_votes = get_user_votes(user["uid"], [item["uid"] for item in items]) if user else {}
|
|
enriched = []
|
|
for item in items:
|
|
entry = {
|
|
key: item,
|
|
"author": authors.get(item["user_uid"]),
|
|
"time_ago": time_ago(item[ts_field]),
|
|
"my_vote": my_votes.get(item["uid"], 0),
|
|
}
|
|
for name, source in extra_maps.items():
|
|
entry[name] = source(item) if callable(source) else source.get(item["uid"], 0)
|
|
enriched.append(entry)
|
|
return enriched
|