|
import logging
|
|
from pathlib import Path
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import JSONResponse
|
|
from devplacepy.database import get_table, get_setting, get_int_setting
|
|
from devplacepy.utils import require_user
|
|
from devplacepy.attachments import store_attachment, delete_attachment as _delete_attachment, ALLOWED_UPLOAD_TYPES
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/upload")
|
|
async def upload_file(request: Request):
|
|
user = require_user(request)
|
|
form = await request.form()
|
|
file = form.get("file")
|
|
|
|
if not file or not hasattr(file, "filename") or not file.filename:
|
|
return JSONResponse({"error": "No file provided"}, status_code=400)
|
|
|
|
ext = Path(file.filename).suffix.lower()
|
|
if ext not in ALLOWED_UPLOAD_TYPES:
|
|
return JSONResponse({"error": f"File type '{ext}' not allowed"}, status_code=415)
|
|
|
|
allowed_types_raw = get_setting("allowed_file_types", "").strip()
|
|
allowed_extensions = set()
|
|
if allowed_types_raw:
|
|
for allowed_ext in allowed_types_raw.split(","):
|
|
allowed_ext = allowed_ext.strip().lower()
|
|
allowed_extensions.add(allowed_ext if allowed_ext.startswith(".") else f".{allowed_ext}")
|
|
if allowed_extensions and ext not in allowed_extensions:
|
|
return JSONResponse({"error": f"File type '{ext}' not allowed"}, status_code=415)
|
|
|
|
try:
|
|
content = await file.read()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to read uploaded file: {e}")
|
|
return JSONResponse({"error": "Failed to read file"}, status_code=400)
|
|
|
|
result = store_attachment(content, file.filename, user["uid"])
|
|
if result is None:
|
|
max_size_mb = get_int_setting("max_upload_size_mb", 10)
|
|
return JSONResponse({"error": f"File exceeds {max_size_mb}MB limit"}, status_code=413)
|
|
|
|
logger.info(f"File uploaded: {file.filename} ({len(content)} bytes) -> {result['url']}")
|
|
return JSONResponse(result, status_code=201)
|
|
|
|
|
|
@router.delete("/delete/{attachment_uid}")
|
|
async def delete_attachment_route(request: Request, attachment_uid: str):
|
|
user = require_user(request)
|
|
att = get_table("attachments").find_one(uid=attachment_uid)
|
|
if not att:
|
|
return JSONResponse({"error": "Attachment not found"}, status_code=404)
|
|
if att.get("user_uid") and att["user_uid"] != user["uid"]:
|
|
return JSONResponse({"error": "Not authorized"}, status_code=403)
|
|
_delete_attachment(attachment_uid)
|
|
logger.info(f"Attachment {attachment_uid} deleted by {user['username']}")
|
|
return JSONResponse({"status": "deleted"})
|