Update
Some checks failed
DevPlace CI / test (push) Failing after 5m28s

This commit is contained in:
retoor 2026-06-05 04:43:06 +02:00
parent 503aab26ac
commit c826096843
3 changed files with 75 additions and 53 deletions

View File

@ -168,6 +168,7 @@ def store_attachment(file_bytes, original_filename, user_uid):
"image_width": image_width,
"image_height": image_height,
"has_thumbnail": 1 if thumbnail else 0,
"thumbnail_name": thumbnail,
"created_at": datetime.now(timezone.utc).isoformat(),
})
return {
@ -183,43 +184,66 @@ def store_attachment(file_bytes, original_filename, user_uid):
def link_attachments(uids, target_type, target_uid):
if not uids:
flat = [uid.strip() for raw in uids or [] for uid in str(raw).split(",") if uid.strip()]
if not flat:
return
attachments = get_table("attachments")
for raw in uids:
for uid in str(raw).split(","):
uid = uid.strip()
if not uid:
continue
existing = attachments.find_one(uid=uid)
if existing:
attachments.update({"id": existing["id"], "target_type": target_type, "target_uid": target_uid}, ["id"])
placeholders = ",".join(f":p{i}" for i in range(len(flat)))
params = {f"p{i}": uid for i, uid in enumerate(flat)}
db.query(
f"UPDATE attachments SET target_type=:tt, target_uid=:tu WHERE uid IN ({placeholders})",
tt=target_type, tu=target_uid, **params,
)
def _unlink_attachment_files(row):
stored_name = row.get("stored_name", "")
directory = row.get("directory", "")
if not (stored_name and directory):
return
file_path = ATTACHMENTS_DIR / directory / stored_name
try:
file_path.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete attachment file {file_path}: {e}")
for thumb_path in (ATTACHMENTS_DIR / directory).glob(f"{Path(stored_name).stem}_thumb.*"):
try:
thumb_path.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete thumbnail {thumb_path}: {e}")
def _delete_attachment_row(row):
_unlink_attachment_files(row)
get_table("attachments").delete(id=row["id"])
def delete_attachment(uid):
attachments = get_table("attachments")
attachment = attachments.find_one(uid=uid)
if not attachment:
return
stored_name = attachment.get("stored_name", "")
directory = attachment.get("directory", "")
if stored_name and directory:
file_path = ATTACHMENTS_DIR / directory / stored_name
try:
file_path.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete attachment file {file_path}: {e}")
for thumb_path in (ATTACHMENTS_DIR / directory).glob(f"{Path(stored_name).stem}_thumb.*"):
try:
thumb_path.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete thumbnail {thumb_path}: {e}")
attachments.delete(id=attachment["id"])
row = get_table("attachments").find_one(uid=uid)
if row:
_delete_attachment_row(row)
def delete_target_attachments(target_type, target_uid):
for attachment in get_table("attachments").find(target_type=target_type, target_uid=target_uid):
delete_attachment(attachment["uid"])
for row in get_table("attachments").find(target_type=target_type, target_uid=target_uid):
_delete_attachment_row(row)
def delete_attachments_for(target_type, target_uids):
uids = [uid for uid in target_uids if uid]
if not uids or "attachments" not in db.tables:
return
placeholders = ",".join(f":p{i}" for i in range(len(uids)))
params = {f"p{i}": uid for i, uid in enumerate(uids)}
rows = list(db.query(
f"SELECT * FROM attachments WHERE target_type=:tt AND target_uid IN ({placeholders})",
tt=target_type, **params,
))
if not rows:
return
for row in rows:
_unlink_attachment_files(row)
ids = ",".join(str(row["id"]) for row in rows)
db.query(f"DELETE FROM attachments WHERE id IN ({ids})")
def get_attachments(target_type, target_uid):
@ -252,9 +276,11 @@ def _row_to_attachment(row):
directory = row.get("directory", "")
thumb_name = None
if row.get("has_thumbnail"):
stem = Path(stored_name).stem
matches = sorted((ATTACHMENTS_DIR / directory).glob(f"{stem}_thumb.*"))
thumb_name = matches[0].name if matches else f"{stem}_thumb.jpg"
thumb_name = row.get("thumbnail_name")
if not thumb_name:
stem = Path(stored_name).stem
png = f"{stem}_thumb.png"
thumb_name = png if (ATTACHMENTS_DIR / directory / png).exists() else f"{stem}_thumb.jpg"
return {
"uid": row["uid"],
"original_filename": row.get("original_filename", ""),

View File

@ -12,7 +12,7 @@ from devplacepy.database import (
load_comments,
db,
)
from devplacepy.attachments import delete_target_attachments, delete_inline_image, get_attachments, link_attachments
from devplacepy.attachments import delete_attachments_for, delete_inline_image, get_attachments, link_attachments
from devplacepy.utils import time_ago, generate_uid, make_combined_slug, award_rewards, create_mention_notifications
logger = logging.getLogger(__name__)
@ -74,13 +74,12 @@ def delete_content_item(table_name: str, target_type: str, user: dict, slug: str
table = get_table(table_name)
item = resolve_by_slug(table, slug)
if is_owner(item, user):
delete_target_attachments(target_type, item["uid"])
delete_attachments_for(target_type, [item["uid"]])
comment_uids = []
if "comments" in db.tables:
comments = get_table("comments")
for comment in comments.find(target_uid=item["uid"]):
comment_uids.append(comment["uid"])
delete_target_attachments("comment", comment["uid"])
comment_uids = [comment["uid"] for comment in comments.find(target_uid=item["uid"])]
delete_attachments_for("comment", comment_uids)
comments.delete(target_uid=item["uid"])
if "votes" in db.tables:
votes = get_table("votes")

View File

@ -278,15 +278,17 @@ def award_xp(user_uid: str, amount: int) -> dict:
def check_milestone_badges(user_uid: str) -> list:
held = {row["badge_name"] for row in get_table("badges").find(user_uid=user_uid)}
awarded = []
if get_table("posts").count(user_uid=user_uid) >= 10 and award_badge(user_uid, "Prolific"):
if "Prolific" not in held and get_table("posts").count(user_uid=user_uid) >= 10 and award_badge(user_uid, "Prolific"):
awarded.append("Prolific")
stars = get_user_stars(user_uid)
if stars >= 100 and award_badge(user_uid, "Star Author"):
awarded.append("Star Author")
if stars >= 25 and award_badge(user_uid, "Rising Star"):
awarded.append("Rising Star")
if get_table("follows").count(following_uid=user_uid) >= 10 and award_badge(user_uid, "Popular"):
if {"Star Author", "Rising Star"} - held:
stars = get_user_stars(user_uid)
if "Star Author" not in held and stars >= 100 and award_badge(user_uid, "Star Author"):
awarded.append("Star Author")
if "Rising Star" not in held and stars >= 25 and award_badge(user_uid, "Rising Star"):
awarded.append("Rising Star")
if "Popular" not in held and get_table("follows").count(following_uid=user_uid) >= 10 and award_badge(user_uid, "Popular"):
awarded.append("Popular")
for badge_name in awarded:
notify_badge(user_uid, badge_name)
@ -301,7 +303,7 @@ def award_rewards(user_uid: str, amount: int, first_badge: str | None = None) ->
def create_mention_notifications(content: str, actor_uid: str, target_url: str) -> None:
usernames = extract_mentions(content)
usernames = list(dict.fromkeys(extract_mentions(content)))
if not usernames:
return
users = get_table("users")
@ -309,13 +311,8 @@ def create_mention_notifications(content: str, actor_uid: str, target_url: str)
if not actor:
return
actor_username = actor["username"]
seen = set()
for username in usernames:
if username in seen:
continue
seen.add(username)
mentioned = users.find_one(username=username)
if mentioned and mentioned["uid"] != actor_uid:
for mentioned in users.find(users.table.columns.username.in_(usernames)):
if mentioned["uid"] != actor_uid:
create_notification(mentioned["uid"], "mention", f"@{actor_username} mentioned you", actor_uid, target_url)