import json
from pathlib import Path
from typing import List, Dict, Optional, Any
import bcrypt
import secrets
import datetime
from .storage_service import UserStorageManager
from .lock_manager import get_lock_manager
class UserService:
def __init__(self, users_path: Path = None, use_isolated_storage: bool = True):
self.use_isolated_storage = use_isolated_storage
self.lock_manager = get_lock_manager()
if use_isolated_storage:
self._storage_manager = UserStorageManager()
self._users = []
else:
self._users_path = users_path
self._users = self._load_users()
def _load_users(self) -> List[Dict[str, Any]]:
if not self._users_path.exists():
return []
try:
with open(self._users_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
return []
async def _save_users(self):
if self.use_isolated_storage:
return
lock = await self.lock_manager.get_lock(str(self._users_path))
async with lock:
with open(self._users_path, "w") as f:
json.dump(self._users, f, indent=4)
async def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
if self.use_isolated_storage:
return await self._storage_manager.get_user(email)
return next((user for user in self._users if user["email"] == email), None)
async def get_all_users(self) -> List[Dict[str, Any]]:
if self.use_isolated_storage:
return await self._storage_manager.get_all_users()
return self._users
async def get_users_by_parent_email(self, parent_email: str) -> List[Dict[str, Any]]:
if self.use_isolated_storage:
return await self._storage_manager.list_users_by_parent(parent_email)
return [user for user in self._users if user.get("parent_email") == parent_email]
async def get_managed_users(self, user_email: str) -> List[Dict[str, Any]]:
user = await self.get_user_by_email(user_email)
if not user:
return []
if not user.get("is_customer", True):
return await self.get_all_users()
return await self.get_users_by_parent_email(user_email)
async def create_user(self, full_name: str, email: str, password: str, parent_email: Optional[str] = None, is_customer: bool = True) -> Dict[str, Any]:
existing_user = await self.get_user_by_email(email)
if existing_user:
raise ValueError("User with this email already exists")
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user = {
"full_name": full_name,
"email": email,
"password": hashed_password,
"storage_quota_gb": 5,
"storage_used_gb": 0,
"reset_token": None,
"reset_token_expiry": None,
"parent_email": parent_email,
"is_customer": is_customer,
}
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
self._users.append(user)
await self._save_users()
return user
async def update_user(self, email: str, **kwargs) -> Optional[Dict[str, Any]]:
user = await self.get_user_by_email(email)
if not user:
return None
for key, value in kwargs.items():
if key == "password":
user[key] = bcrypt.hashpw(value.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
else:
user[key] = value
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
return user
async def delete_user(self, email: str) -> bool:
if self.use_isolated_storage:
return await self._storage_manager.delete_user(email)
initial_len = len(self._users)
self._users = [user for user in self._users if user["email"] != email]
if len(self._users) < initial_len:
await self._save_users()
return True
return False
async def delete_users_by_parent_email(self, parent_email: str) -> int:
users_to_delete = await self.get_users_by_parent_email(parent_email)
deleted_count = 0
if self.use_isolated_storage:
for user in users_to_delete:
if await self._storage_manager.delete_user(user["email"]):
deleted_count += 1
else:
initial_len = len(self._users)
self._users = [user for user in self._users if user.get("parent_email") != parent_email]
deleted_count = initial_len - len(self._users)
if deleted_count > 0:
await self._save_users()
return deleted_count
async def authenticate_user(self, email: str, password: str) -> bool:
user = await self.get_user_by_email(email)
if not user:
return False
return bcrypt.checkpw(password.encode('utf-8'), user["password"].encode('utf-8'))
async def get_user_by_reset_token(self, token: str) -> Optional[Dict[str, Any]]:
all_users = await self.get_all_users()
for user in all_users:
if user.get("reset_token") == token:
expiry_str = user.get("reset_token_expiry")
if expiry_str:
expiry = datetime.datetime.fromisoformat(expiry_str)
if expiry > datetime.datetime.now(datetime.timezone.utc):
return user
return None
async def generate_reset_token(self, email: str) -> Optional[str]:
user = await self.get_user_by_email(email)
if not user:
return None
token = secrets.token_urlsafe(32)
expiry = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1)
user["reset_token"] = token
user["reset_token_expiry"] = expiry.isoformat()
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
return token
async def validate_reset_token(self, email: str, token: str) -> bool:
user = await self.get_user_by_email(email)
if not user or user.get("reset_token") != token:
return False
expiry_str = user.get("reset_token_expiry")
if not expiry_str:
return False
expiry = datetime.datetime.fromisoformat(expiry_str)
return expiry > datetime.datetime.now(datetime.timezone.utc)
async def reset_password(self, email: str, token: str, new_password: str) -> bool:
if not await self.validate_reset_token(email, token):
return False
user = await self.get_user_by_email(email)
if not user:
return False
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user["password"] = hashed_password
user["reset_token"] = None
user["reset_token_expiry"] = None
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
return True
async def update_user_quota(self, email: str, new_quota_gb: float):
user = await self.get_user_by_email(email)
if not user:
raise ValueError("User not found")
user["storage_quota_gb"] = new_quota_gb
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
async def add_favorite(self, email: str, file_path: str) -> bool:
user = await self.get_user_by_email(email)
if not user:
return False
if "favorites" not in user:
user["favorites"] = []
if file_path not in user["favorites"]:
user["favorites"].append(file_path)
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
return True
async def remove_favorite(self, email: str, file_path: str) -> bool:
user = await self.get_user_by_email(email)
if not user:
return False
if "favorites" in user and file_path in user["favorites"]:
user["favorites"].remove(file_path)
if self.use_isolated_storage:
await self._storage_manager.save_user(email, user)
else:
await self._save_users()
return True
async def get_favorites(self, email: str) -> List[str]:
user = await self.get_user_by_email(email)
if not user:
return []
return user.get("favorites", [])
async def is_favorite(self, email: str, file_path: str) -> bool:
user = await self.get_user_by_email(email)
if not user:
return False
return file_path in user.get("favorites", [])