From c82609684331ba762885330f4f3a5d469efb7034 Mon Sep 17 00:00:00 2001 From: retoor Date: Fri, 5 Jun 2026 04:43:06 +0200 Subject: [PATCH] Update --- devplacepy/attachments.py | 92 +++++++++++++++++++++++++-------------- devplacepy/content.py | 9 ++-- devplacepy/utils.py | 27 +++++------- 3 files changed, 75 insertions(+), 53 deletions(-) diff --git a/devplacepy/attachments.py b/devplacepy/attachments.py index d092401..1c35fa7 100644 --- a/devplacepy/attachments.py +++ b/devplacepy/attachments.py @@ -168,6 +168,7 @@ def store_attachment(file_bytes, original_filename, user_uid): "image_width": image_width, "image_height": image_height, "has_thumbnail": 1 if thumbnail else 0, + "thumbnail_name": thumbnail, "created_at": datetime.now(timezone.utc).isoformat(), }) return { @@ -183,43 +184,66 @@ def store_attachment(file_bytes, original_filename, user_uid): def link_attachments(uids, target_type, target_uid): - if not uids: + flat = [uid.strip() for raw in uids or [] for uid in str(raw).split(",") if uid.strip()] + if not flat: return - attachments = get_table("attachments") - for raw in uids: - for uid in str(raw).split(","): - uid = uid.strip() - if not uid: - continue - existing = attachments.find_one(uid=uid) - if existing: - attachments.update({"id": existing["id"], "target_type": target_type, "target_uid": target_uid}, ["id"]) + placeholders = ",".join(f":p{i}" for i in range(len(flat))) + params = {f"p{i}": uid for i, uid in enumerate(flat)} + db.query( + f"UPDATE attachments SET target_type=:tt, target_uid=:tu WHERE uid IN ({placeholders})", + tt=target_type, tu=target_uid, **params, + ) + + +def _unlink_attachment_files(row): + stored_name = row.get("stored_name", "") + directory = row.get("directory", "") + if not (stored_name and directory): + return + file_path = ATTACHMENTS_DIR / directory / stored_name + try: + file_path.unlink(missing_ok=True) + except Exception as e: + logger.warning(f"Failed to delete attachment file {file_path}: {e}") + for thumb_path in (ATTACHMENTS_DIR / directory).glob(f"{Path(stored_name).stem}_thumb.*"): + try: + thumb_path.unlink(missing_ok=True) + except Exception as e: + logger.warning(f"Failed to delete thumbnail {thumb_path}: {e}") + + +def _delete_attachment_row(row): + _unlink_attachment_files(row) + get_table("attachments").delete(id=row["id"]) def delete_attachment(uid): - attachments = get_table("attachments") - attachment = attachments.find_one(uid=uid) - if not attachment: - return - stored_name = attachment.get("stored_name", "") - directory = attachment.get("directory", "") - if stored_name and directory: - file_path = ATTACHMENTS_DIR / directory / stored_name - try: - file_path.unlink(missing_ok=True) - except Exception as e: - logger.warning(f"Failed to delete attachment file {file_path}: {e}") - for thumb_path in (ATTACHMENTS_DIR / directory).glob(f"{Path(stored_name).stem}_thumb.*"): - try: - thumb_path.unlink(missing_ok=True) - except Exception as e: - logger.warning(f"Failed to delete thumbnail {thumb_path}: {e}") - attachments.delete(id=attachment["id"]) + row = get_table("attachments").find_one(uid=uid) + if row: + _delete_attachment_row(row) def delete_target_attachments(target_type, target_uid): - for attachment in get_table("attachments").find(target_type=target_type, target_uid=target_uid): - delete_attachment(attachment["uid"]) + for row in get_table("attachments").find(target_type=target_type, target_uid=target_uid): + _delete_attachment_row(row) + + +def delete_attachments_for(target_type, target_uids): + uids = [uid for uid in target_uids if uid] + if not uids or "attachments" not in db.tables: + return + placeholders = ",".join(f":p{i}" for i in range(len(uids))) + params = {f"p{i}": uid for i, uid in enumerate(uids)} + rows = list(db.query( + f"SELECT * FROM attachments WHERE target_type=:tt AND target_uid IN ({placeholders})", + tt=target_type, **params, + )) + if not rows: + return + for row in rows: + _unlink_attachment_files(row) + ids = ",".join(str(row["id"]) for row in rows) + db.query(f"DELETE FROM attachments WHERE id IN ({ids})") def get_attachments(target_type, target_uid): @@ -252,9 +276,11 @@ def _row_to_attachment(row): directory = row.get("directory", "") thumb_name = None if row.get("has_thumbnail"): - stem = Path(stored_name).stem - matches = sorted((ATTACHMENTS_DIR / directory).glob(f"{stem}_thumb.*")) - thumb_name = matches[0].name if matches else f"{stem}_thumb.jpg" + thumb_name = row.get("thumbnail_name") + if not thumb_name: + stem = Path(stored_name).stem + png = f"{stem}_thumb.png" + thumb_name = png if (ATTACHMENTS_DIR / directory / png).exists() else f"{stem}_thumb.jpg" return { "uid": row["uid"], "original_filename": row.get("original_filename", ""), diff --git a/devplacepy/content.py b/devplacepy/content.py index aeb29fa..c300911 100644 --- a/devplacepy/content.py +++ b/devplacepy/content.py @@ -12,7 +12,7 @@ from devplacepy.database import ( load_comments, db, ) -from devplacepy.attachments import delete_target_attachments, delete_inline_image, get_attachments, link_attachments +from devplacepy.attachments import delete_attachments_for, delete_inline_image, get_attachments, link_attachments from devplacepy.utils import time_ago, generate_uid, make_combined_slug, award_rewards, create_mention_notifications logger = logging.getLogger(__name__) @@ -74,13 +74,12 @@ def delete_content_item(table_name: str, target_type: str, user: dict, slug: str table = get_table(table_name) item = resolve_by_slug(table, slug) if is_owner(item, user): - delete_target_attachments(target_type, item["uid"]) + delete_attachments_for(target_type, [item["uid"]]) comment_uids = [] if "comments" in db.tables: comments = get_table("comments") - for comment in comments.find(target_uid=item["uid"]): - comment_uids.append(comment["uid"]) - delete_target_attachments("comment", comment["uid"]) + comment_uids = [comment["uid"] for comment in comments.find(target_uid=item["uid"])] + delete_attachments_for("comment", comment_uids) comments.delete(target_uid=item["uid"]) if "votes" in db.tables: votes = get_table("votes") diff --git a/devplacepy/utils.py b/devplacepy/utils.py index fad6c40..2e367eb 100644 --- a/devplacepy/utils.py +++ b/devplacepy/utils.py @@ -278,15 +278,17 @@ def award_xp(user_uid: str, amount: int) -> dict: def check_milestone_badges(user_uid: str) -> list: + held = {row["badge_name"] for row in get_table("badges").find(user_uid=user_uid)} awarded = [] - if get_table("posts").count(user_uid=user_uid) >= 10 and award_badge(user_uid, "Prolific"): + if "Prolific" not in held and get_table("posts").count(user_uid=user_uid) >= 10 and award_badge(user_uid, "Prolific"): awarded.append("Prolific") - stars = get_user_stars(user_uid) - if stars >= 100 and award_badge(user_uid, "Star Author"): - awarded.append("Star Author") - if stars >= 25 and award_badge(user_uid, "Rising Star"): - awarded.append("Rising Star") - if get_table("follows").count(following_uid=user_uid) >= 10 and award_badge(user_uid, "Popular"): + if {"Star Author", "Rising Star"} - held: + stars = get_user_stars(user_uid) + if "Star Author" not in held and stars >= 100 and award_badge(user_uid, "Star Author"): + awarded.append("Star Author") + if "Rising Star" not in held and stars >= 25 and award_badge(user_uid, "Rising Star"): + awarded.append("Rising Star") + if "Popular" not in held and get_table("follows").count(following_uid=user_uid) >= 10 and award_badge(user_uid, "Popular"): awarded.append("Popular") for badge_name in awarded: notify_badge(user_uid, badge_name) @@ -301,7 +303,7 @@ def award_rewards(user_uid: str, amount: int, first_badge: str | None = None) -> def create_mention_notifications(content: str, actor_uid: str, target_url: str) -> None: - usernames = extract_mentions(content) + usernames = list(dict.fromkeys(extract_mentions(content))) if not usernames: return users = get_table("users") @@ -309,13 +311,8 @@ def create_mention_notifications(content: str, actor_uid: str, target_url: str) if not actor: return actor_username = actor["username"] - seen = set() - for username in usernames: - if username in seen: - continue - seen.add(username) - mentioned = users.find_one(username=username) - if mentioned and mentioned["uid"] != actor_uid: + for mentioned in users.find(users.table.columns.username.in_(usernames)): + if mentioned["uid"] != actor_uid: create_notification(mentioned["uid"], "mention", f"@{actor_username} mentioned you", actor_uid, target_url)