import json
from pathlib import Path
from typing import List, Dict, Optional, Any
import bcrypt # Import bcrypt
import secrets # For generating secure tokens
import datetime # For token expiry
class UserService:
def __init__(self, users_path: Path):
self._users_path = users_path
self._users = self._load_users()
def _load_users(self) -> List[Dict[str, Any]]:
if not self._users_path.exists():
return []
with open(self._users_path, "r") as f:
return json.load(f)
def _save_users(self):
with open(self._users_path, "w") as f:
json.dump(self._users, f, indent=4)
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
return next((user for user in self._users if user["email"] == email), None)
def get_all_users(self) -> List[Dict[str, Any]]:
return self._users
def get_users_by_parent_email(self, parent_email: str) -> List[Dict[str, Any]]:
return [user for user in self._users if user.get("parent_email") == parent_email]
def create_user(self, full_name: str, email: str, password: str, parent_email: Optional[str] = None) -> Dict[str, Any]:
if self.get_user_by_email(email):
raise ValueError("User with this email already exists")
# Hash password with bcrypt
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user = {
"full_name": full_name,
"email": email,
"password": hashed_password,
"storage_quota_gb": 5, # Default quota
"storage_used_gb": 0,
"reset_token": None,
"reset_token_expiry": None,
"parent_email": parent_email, # New field for hierarchical user management
}
self._users.append(user)
self._save_users()
return user
def update_user(self, email: str, **kwargs) -> Optional[Dict[str, Any]]:
user = self.get_user_by_email(email)
if not user:
return None
for key, value in kwargs.items():
if key == "password":
user[key] = bcrypt.hashpw(value.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
else:
user[key] = value
self._save_users()
return user
def delete_user(self, email: str) -> bool:
initial_len = len(self._users)
self._users = [user for user in self._users if user["email"] != email]
if len(self._users) < initial_len:
self._save_users()
return True
return False
def delete_users_by_parent_email(self, parent_email: str) -> int:
initial_len = len(self._users)
self._users = [user for user in self._users if user.get("parent_email") != parent_email]
deleted_count = initial_len - len(self._users)
if deleted_count > 0:
self._save_users()
return deleted_count
def authenticate_user(self, email: str, password: str) -> bool:
user = self.get_user_by_email(email)
if not user:
return False
# Verify password with bcrypt
return bcrypt.checkpw(password.encode('utf-8'), user["password"].encode('utf-8'))
def get_user_by_reset_token(self, token: str) -> Optional[Dict[str, Any]]:
for user in self._users:
if user.get("reset_token") == token:
expiry_str = user.get("reset_token_expiry")
if expiry_str:
expiry = datetime.datetime.fromisoformat(expiry_str)
if expiry > datetime.datetime.now(datetime.timezone.utc):
return user
return None
def generate_reset_token(self, email: str) -> Optional[str]:
user = self.get_user_by_email(email)
if not user:
return None
token = secrets.token_urlsafe(32)
expiry = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=1) # Token valid for 1 hour
user["reset_token"] = token
user["reset_token_expiry"] = expiry.isoformat()
self._save_users()
return token
def validate_reset_token(self, email: str, token: str) -> bool:
user = self.get_user_by_email(email)
if not user or user.get("reset_token") != token:
return False
expiry_str = user.get("reset_token_expiry")
if not expiry_str:
return False
expiry = datetime.datetime.fromisoformat(expiry_str)
return expiry > datetime.datetime.now(datetime.timezone.utc)
def reset_password(self, email: str, token: str, new_password: str) -> bool:
if not self.validate_reset_token(email, token):
return False
user = self.get_user_by_email(email)
if not user: # Should not happen if validate_reset_token passed, but for type safety
return False
hashed_password = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
user["password"] = hashed_password
user["reset_token"] = None
user["reset_token_expiry"] = None
self._save_users()
return True
def update_user_quota(self, email: str, new_quota_gb: float):
user = self.get_user_by_email(email)
if not user:
raise ValueError("User not found")
user["storage_quota_gb"] = new_quota_gb
self._save_users()