56 lines
2.1 KiB
Python
Raw Normal View History

2025-11-09 23:29:07 +01:00
import os
import aiofiles
from pathlib import Path
from typing import AsyncGenerator
from .settings import settings
class StorageManager:
def __init__(self, base_path: str = settings.STORAGE_PATH):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
async def _get_full_path(self, user_id: int, file_path: str) -> Path:
# Ensure file_path is relative and safe
relative_path = Path(file_path).relative_to('/') if str(file_path).startswith('/') else Path(file_path)
full_path = self.base_path / str(user_id) / relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)
return full_path
async def save_file(self, user_id: int, file_path: str, file_content: bytes):
full_path = await self._get_full_path(user_id, file_path)
async with aiofiles.open(full_path, "wb") as f:
await f.write(file_content)
return str(full_path)
async def get_file(self, user_id: int, file_path: str) -> AsyncGenerator:
full_path = await self._get_full_path(user_id, file_path)
if not full_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
async with aiofiles.open(full_path, "rb") as f:
while chunk := await f.read(8192):
yield chunk
async def delete_file(self, user_id: int, file_path: str):
full_path = await self._get_full_path(user_id, file_path)
if full_path.exists():
os.remove(full_path)
2025-11-10 15:46:40 +01:00
parent_dir = full_path.parent
while parent_dir != self.base_path and parent_dir.exists():
try:
if not any(parent_dir.iterdir()):
parent_dir.rmdir()
parent_dir = parent_dir.parent
else:
break
except OSError:
break
2025-11-09 23:29:07 +01:00
async def file_exists(self, user_id: int, file_path: str) -> bool:
full_path = await self._get_full_path(user_id, file_path)
return full_path.exists()
storage_manager = StorageManager()