152 lines
6.2 KiB
Python
152 lines
6.2 KiB
Python
|
|
import aiofiles
|
||
|
|
import json
|
||
|
|
from pathlib import Path
|
||
|
|
import shutil
|
||
|
|
import uuid
|
||
|
|
import datetime
|
||
|
|
|
||
|
|
class FileService:
|
||
|
|
def __init__(self, base_dir: Path, users_data_path: Path):
|
||
|
|
self.base_dir = base_dir
|
||
|
|
self.users_data_path = users_data_path
|
||
|
|
self.base_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
async def _load_users_data(self):
|
||
|
|
"""Loads user data from the JSON file."""
|
||
|
|
if not self.users_data_path.exists():
|
||
|
|
return {}
|
||
|
|
async with aiofiles.open(self.users_data_path, mode="r") as f:
|
||
|
|
content = await f.read()
|
||
|
|
return json.loads(content) if content else {}
|
||
|
|
|
||
|
|
async def _save_users_data(self, data):
|
||
|
|
"""Saves user data to the JSON file."""
|
||
|
|
async with aiofiles.open(self.users_data_path, mode="w") as f:
|
||
|
|
await f.write(json.dumps(data, indent=4))
|
||
|
|
|
||
|
|
def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
|
||
|
|
"""Constructs the absolute path for a user's file or directory."""
|
||
|
|
user_dir = self.base_dir / user_email
|
||
|
|
return user_dir / relative_path
|
||
|
|
|
||
|
|
async def list_files(self, user_email: str, path: str = "") -> list:
|
||
|
|
"""Lists files and directories for a given user within a specified path."""
|
||
|
|
user_path = self._get_user_file_path(user_email, path)
|
||
|
|
if not user_path.is_dir():
|
||
|
|
return []
|
||
|
|
|
||
|
|
files_list = []
|
||
|
|
for item in user_path.iterdir():
|
||
|
|
if item.name.startswith('.'): # Ignore hidden files/directories
|
||
|
|
continue
|
||
|
|
file_info = {
|
||
|
|
"name": item.name,
|
||
|
|
"is_dir": item.is_dir(),
|
||
|
|
"path": str(item.relative_to(self._get_user_file_path(user_email))),
|
||
|
|
"size": item.stat().st_size if item.is_file() else 0,
|
||
|
|
"last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
|
||
|
|
}
|
||
|
|
files_list.append(file_info)
|
||
|
|
return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
|
||
|
|
|
||
|
|
async def create_folder(self, user_email: str, folder_path: str) -> bool:
|
||
|
|
"""Creates a new folder for the user."""
|
||
|
|
full_path = self._get_user_file_path(user_email, folder_path)
|
||
|
|
if full_path.exists():
|
||
|
|
return False # Folder already exists
|
||
|
|
full_path.mkdir(parents=True, exist_ok=True)
|
||
|
|
return True
|
||
|
|
|
||
|
|
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
|
||
|
|
"""Uploads a file for the user."""
|
||
|
|
full_path = self._get_user_file_path(user_email, file_path)
|
||
|
|
full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
|
||
|
|
async with aiofiles.open(full_path, mode="wb") as f:
|
||
|
|
await f.write(content)
|
||
|
|
return True
|
||
|
|
|
||
|
|
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
|
||
|
|
"""Downloads a file for the user."""
|
||
|
|
full_path = self._get_user_file_path(user_email, file_path)
|
||
|
|
if full_path.is_file():
|
||
|
|
async with aiofiles.open(full_path, mode="rb") as f:
|
||
|
|
content = await f.read()
|
||
|
|
return content, full_path.name
|
||
|
|
return None
|
||
|
|
|
||
|
|
async def delete_item(self, user_email: str, item_path: str) -> bool:
|
||
|
|
"""Deletes a file or folder for the user."""
|
||
|
|
full_path = self._get_user_file_path(user_email, item_path)
|
||
|
|
if not full_path.exists():
|
||
|
|
return False
|
||
|
|
|
||
|
|
if full_path.is_file():
|
||
|
|
full_path.unlink()
|
||
|
|
elif full_path.is_dir():
|
||
|
|
shutil.rmtree(full_path)
|
||
|
|
return True
|
||
|
|
|
||
|
|
async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
|
||
|
|
"""Generates a shareable link for a file or folder."""
|
||
|
|
users_data = await self._load_users_data()
|
||
|
|
user = users_data.get(user_email)
|
||
|
|
if not user:
|
||
|
|
return None
|
||
|
|
|
||
|
|
full_path = self._get_user_file_path(user_email, item_path)
|
||
|
|
if not full_path.exists():
|
||
|
|
return None
|
||
|
|
|
||
|
|
share_id = str(uuid.uuid4())
|
||
|
|
if "shared_items" not in user:
|
||
|
|
user["shared_items"] = {}
|
||
|
|
user["shared_items"][share_id] = {
|
||
|
|
"user_email": user_email,
|
||
|
|
"item_path": item_path,
|
||
|
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||
|
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
|
||
|
|
}
|
||
|
|
await self._save_users_data(users_data)
|
||
|
|
return share_id
|
||
|
|
|
||
|
|
async def get_shared_item(self, share_id: str) -> dict | None:
|
||
|
|
"""Retrieves information about a shared item."""
|
||
|
|
users_data = await self._load_users_data()
|
||
|
|
for user_email, user_info in users_data.items():
|
||
|
|
if "shared_items" in user_info and share_id in user_info["shared_items"]:
|
||
|
|
shared_item = user_info["shared_items"][share_id]
|
||
|
|
expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
|
||
|
|
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
|
||
|
|
return shared_item
|
||
|
|
return None
|
||
|
|
|
||
|
|
async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None:
|
||
|
|
"""Retrieves the content of a shared file."""
|
||
|
|
shared_item = await self.get_shared_item(share_id)
|
||
|
|
if not shared_item:
|
||
|
|
return None
|
||
|
|
|
||
|
|
user_email = shared_item["user_email"]
|
||
|
|
item_path = shared_item["item_path"]
|
||
|
|
full_path = self._get_user_file_path(user_email, item_path)
|
||
|
|
|
||
|
|
if full_path.is_file():
|
||
|
|
async with aiofiles.open(full_path, mode="rb") as f:
|
||
|
|
content = await f.read()
|
||
|
|
return content, full_path.name
|
||
|
|
return None
|
||
|
|
|
||
|
|
async def get_shared_folder_content(self, share_id: str) -> list | None:
|
||
|
|
"""Retrieves the content of a shared folder."""
|
||
|
|
shared_item = await self.get_shared_item(share_id)
|
||
|
|
if not shared_item:
|
||
|
|
return None
|
||
|
|
|
||
|
|
user_email = shared_item["user_email"]
|
||
|
|
item_path = shared_item["item_path"]
|
||
|
|
full_path = self._get_user_file_path(user_email, item_path)
|
||
|
|
|
||
|
|
if full_path.is_dir():
|
||
|
|
return await self.list_files(user_email, item_path)
|
||
|
|
return None
|