import asyncio from pathlib import Path from PIL import Image import subprocess from typing import Optional from .settings import settings THUMBNAIL_SIZE = (300, 300) THUMBNAIL_DIR = "thumbnails" async def generate_thumbnail( file_path: str, mime_type: str, user_id: int ) -> Optional[str]: try: if mime_type.startswith("image/"): return await generate_image_thumbnail(file_path, user_id) elif mime_type.startswith("video/"): return await generate_video_thumbnail(file_path, user_id) return None except Exception as e: print(f"Error generating thumbnail for {file_path}: {e}") return None async def generate_image_thumbnail(file_path: str, user_id: int) -> Optional[str]: loop = asyncio.get_event_loop() def _generate(): base_path = Path(settings.STORAGE_PATH) thumbnail_dir = base_path / str(user_id) / THUMBNAIL_DIR thumbnail_dir.mkdir(parents=True, exist_ok=True) file_name = Path(file_path).name thumbnail_name = f"thumb_{file_name}" if not thumbnail_name.lower().endswith((".jpg", ".jpeg", ".png")): thumbnail_name += ".jpg" thumbnail_path = thumbnail_dir / thumbnail_name actual_file_path = ( base_path / str(user_id) / file_path if not Path(file_path).is_absolute() else Path(file_path) ) with Image.open(actual_file_path) as img: img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS) if img.mode in ("RGBA", "LA", "P"): background = Image.new("RGB", img.size, (255, 255, 255)) if img.mode == "P": img = img.convert("RGBA") background.paste( img, mask=img.split()[-1] if img.mode in ("RGBA", "LA") else None ) img = background img.save(str(thumbnail_path), "JPEG", quality=85, optimize=True) return str(thumbnail_path.relative_to(base_path / str(user_id))) return await loop.run_in_executor(None, _generate) async def generate_video_thumbnail(file_path: str, user_id: int) -> Optional[str]: loop = asyncio.get_event_loop() def _generate(): base_path = Path(settings.STORAGE_PATH) thumbnail_dir = base_path / str(user_id) / THUMBNAIL_DIR thumbnail_dir.mkdir(parents=True, exist_ok=True) file_name = Path(file_path).stem thumbnail_name = f"thumb_{file_name}.jpg" thumbnail_path = thumbnail_dir / thumbnail_name actual_file_path = ( base_path / str(user_id) / file_path if not Path(file_path).is_absolute() else Path(file_path) ) subprocess.run( [ "ffmpeg", "-i", str(actual_file_path), "-ss", "00:00:01", "-vframes", "1", "-vf", f"scale={THUMBNAIL_SIZE[0]}:{THUMBNAIL_SIZE[1]}:force_original_aspect_ratio=decrease", "-y", str(thumbnail_path), ], check=True, capture_output=True, ) return str(thumbnail_path.relative_to(base_path / str(user_id))) try: return await loop.run_in_executor(None, _generate) except subprocess.CalledProcessError: return None async def delete_thumbnail(thumbnail_path: str, user_id: int): try: base_path = Path(settings.STORAGE_PATH) full_path = base_path / str(user_id) / thumbnail_path if full_path.exists(): full_path.unlink() except Exception as e: print(f"Error deleting thumbnail {thumbnail_path}: {e}")