|
import os
|
|
import asyncio
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
import subprocess
|
|
from typing import Optional
|
|
from .settings import settings
|
|
|
|
THUMBNAIL_SIZE = (300, 300)
|
|
THUMBNAIL_DIR = "thumbnails"
|
|
|
|
async def generate_thumbnail(file_path: str, mime_type: str, user_id: int) -> Optional[str]:
|
|
try:
|
|
if mime_type.startswith("image/"):
|
|
return await generate_image_thumbnail(file_path, user_id)
|
|
elif mime_type.startswith("video/"):
|
|
return await generate_video_thumbnail(file_path, user_id)
|
|
return None
|
|
except Exception as e:
|
|
print(f"Error generating thumbnail for {file_path}: {e}")
|
|
return None
|
|
|
|
async def generate_image_thumbnail(file_path: str, user_id: int) -> Optional[str]:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _generate():
|
|
base_path = Path(settings.STORAGE_PATH)
|
|
thumbnail_dir = base_path / str(user_id) / THUMBNAIL_DIR
|
|
thumbnail_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
file_name = Path(file_path).name
|
|
thumbnail_name = f"thumb_{file_name}"
|
|
if not thumbnail_name.lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
thumbnail_name += ".jpg"
|
|
|
|
thumbnail_path = thumbnail_dir / thumbnail_name
|
|
|
|
actual_file_path = base_path / str(user_id) / file_path if not Path(file_path).is_absolute() else Path(file_path)
|
|
|
|
with Image.open(actual_file_path) as img:
|
|
img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS)
|
|
|
|
if img.mode in ("RGBA", "LA", "P"):
|
|
background = Image.new("RGB", img.size, (255, 255, 255))
|
|
if img.mode == "P":
|
|
img = img.convert("RGBA")
|
|
background.paste(img, mask=img.split()[-1] if img.mode in ("RGBA", "LA") else None)
|
|
img = background
|
|
|
|
img.save(str(thumbnail_path), "JPEG", quality=85, optimize=True)
|
|
|
|
return str(thumbnail_path.relative_to(base_path / str(user_id)))
|
|
|
|
return await loop.run_in_executor(None, _generate)
|
|
|
|
async def generate_video_thumbnail(file_path: str, user_id: int) -> Optional[str]:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _generate():
|
|
base_path = Path(settings.STORAGE_PATH)
|
|
thumbnail_dir = base_path / str(user_id) / THUMBNAIL_DIR
|
|
thumbnail_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
file_name = Path(file_path).stem
|
|
thumbnail_name = f"thumb_{file_name}.jpg"
|
|
thumbnail_path = thumbnail_dir / thumbnail_name
|
|
|
|
actual_file_path = base_path / str(user_id) / file_path if not Path(file_path).is_absolute() else Path(file_path)
|
|
|
|
subprocess.run([
|
|
"ffmpeg",
|
|
"-i", str(actual_file_path),
|
|
"-ss", "00:00:01",
|
|
"-vframes", "1",
|
|
"-vf", f"scale={THUMBNAIL_SIZE[0]}:{THUMBNAIL_SIZE[1]}:force_original_aspect_ratio=decrease",
|
|
"-y",
|
|
str(thumbnail_path)
|
|
], check=True, capture_output=True)
|
|
|
|
return str(thumbnail_path.relative_to(base_path / str(user_id)))
|
|
|
|
try:
|
|
return await loop.run_in_executor(None, _generate)
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
async def delete_thumbnail(thumbnail_path: str, user_id: int):
|
|
try:
|
|
base_path = Path(settings.STORAGE_PATH)
|
|
full_path = base_path / str(user_id) / thumbnail_path
|
|
if full_path.exists():
|
|
full_path.unlink()
|
|
except Exception as e:
|
|
print(f"Error deleting thumbnail {thumbnail_path}: {e}")
|