import pytest
import json
from retoors.services.user_service import UserService
import bcrypt
import datetime
@pytest.fixture
def users_file(tmp_path):
"""Fixture to create a temporary users.json file for testing."""
file = tmp_path / "users.json"
with open(file, "w") as f:
json.dump([], f) # Start with an empty list of users
return file
@pytest.fixture
def user_service(users_file):
"""Fixture to provide a UserService instance with a temporary users.json."""
return UserService(users_file)
@pytest.fixture
def populated_user_service(user_service):
"""Fixture to provide a UserService instance with some pre-populated users."""
user_service.create_user("Admin User", "admin@example.com", "adminpass")
user_service.create_user("Parent User", "parent@example.com", "parentpass")
user_service.create_user("Child User 1", "child1@example.com", "childpass", "parent@example.com")
user_service.create_user("Child User 2", "child2@example.com", "childpass", "parent@example.com")
return user_service
async def test_create_user_success(user_service):
user = user_service.create_user("Test User", "test@example.com", "password123")
assert user is not None
assert user["email"] == "test@example.com"
assert user_service.get_user_by_email("test@example.com") is not None
assert bcrypt.checkpw(b"password123", user["password"].encode('utf-8'))
async def test_create_user_duplicate_email(user_service):
user_service.create_user("Test User", "test@example.com", "password123")
with pytest.raises(ValueError, match="User with this email already exists"):
user_service.create_user("Another User", "test@example.com", "anotherpass")
async def test_get_all_users(populated_user_service):
users = populated_user_service.get_all_users()
assert len(users) == 4
emails = {user["email"] for user in users}
assert "admin@example.com" in emails
assert "parent@example.com" in emails
assert "child1@example.com" in emails
assert "child2@example.com" in emails
async def test_get_users_by_parent_email(populated_user_service):
children = populated_user_service.get_users_by_parent_email("parent@example.com")
assert len(children) == 2
child_emails = {user["email"] for user in children}
assert "child1@example.com" in child_emails
assert "child2@example.com" in child_emails
no_children = populated_user_service.get_users_by_parent_email("nonexistent@example.com")
assert len(no_children) == 0
admin_children = populated_user_service.get_users_by_parent_email("admin@example.com")
assert len(admin_children) == 0
async def test_update_user_non_password_fields(populated_user_service):
updated_user = populated_user_service.update_user("admin@example.com", full_name="Administrator", storage_quota_gb=10)
assert updated_user is not None
assert updated_user["full_name"] == "Administrator"
assert updated_user["storage_quota_gb"] == 10
retrieved_user = populated_user_service.get_user_by_email("admin@example.com")
assert retrieved_user["full_name"] == "Administrator"
assert retrieved_user["storage_quota_gb"] == 10
async def test_update_user_password(populated_user_service):
updated_user = populated_user_service.update_user("admin@example.com", password="newadminpass")
assert updated_user is not None
assert populated_user_service.authenticate_user("admin@example.com", "newadminpass")
assert not populated_user_service.authenticate_user("admin@example.com", "adminpass")
async def test_update_user_nonexistent(user_service):
updated_user = user_service.update_user("nonexistent@example.com", full_name="Non Existent")
assert updated_user is None
async def test_delete_user_success(populated_user_service):
assert populated_user_service.delete_user("admin@example.com") is True
assert populated_user_service.get_user_by_email("admin@example.com") is None
assert len(populated_user_service.get_all_users()) == 3
async def test_delete_user_nonexistent(user_service):
assert user_service.delete_user("nonexistent@example.com") is False
async def test_delete_users_by_parent_email_success(populated_user_service):
deleted_count = populated_user_service.delete_users_by_parent_email("parent@example.com")
assert deleted_count == 2
assert populated_user_service.get_user_by_email("child1@example.com") is None
assert populated_user_service.get_user_by_email("child2@example.com") is None
assert len(populated_user_service.get_all_users()) == 2 # Admin and Parent users remain
async def test_delete_users_by_parent_email_no_match(user_service):
deleted_count = user_service.delete_users_by_parent_email("nonexistent@example.com")
assert deleted_count == 0
async def test_authenticate_user_success(populated_user_service):
assert populated_user_service.authenticate_user("admin@example.com", "adminpass") is True
async def test_authenticate_user_fail_wrong_password(populated_user_service):
assert populated_user_service.authenticate_user("admin@example.com", "wrongpass") is False
async def test_authenticate_user_fail_nonexistent_user(user_service):
assert user_service.authenticate_user("nonexistent@example.com", "anypass") is False
async def test_generate_reset_token_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert token is not None
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] == token
assert user["reset_token_expiry"] is not None
# Check expiry is in the future
expiry_dt = datetime.datetime.fromisoformat(user["reset_token_expiry"])
assert expiry_dt > datetime.datetime.now(datetime.timezone.utc)
async def test_generate_reset_token_nonexistent_user(user_service):
token = user_service.generate_reset_token("nonexistent@example.com")
assert token is None
async def test_get_user_by_reset_token_valid(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_reset_token(token)
assert user is not None
assert user["email"] == "admin@example.com"
async def test_get_user_by_reset_token_invalid(populated_user_service):
user = populated_user_service.get_user_by_reset_token("invalidtoken")
assert user is None
async def test_get_user_by_reset_token_expired(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_email("admin@example.com")
# Manually expire the token
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
populated_user_service._save_users() # Save the expired state
user_after_expiry = populated_user_service.get_user_by_reset_token(token)
assert user_after_expiry is None
async def test_validate_reset_token_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.validate_reset_token("admin@example.com", token) is True
async def test_validate_reset_token_fail_wrong_token(populated_user_service):
populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.validate_reset_token("admin@example.com", "wrongtoken") is False
async def test_validate_reset_token_fail_nonexistent_user(user_service):
assert user_service.validate_reset_token("nonexistent@example.com", "anytoken") is False
async def test_validate_reset_token_fail_expired_token(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_email("admin@example.com")
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
populated_user_service._save_users()
assert populated_user_service.validate_reset_token("admin@example.com", token) is False
async def test_reset_password_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.reset_password("admin@example.com", token, "newadminpass") is True
assert populated_user_service.authenticate_user("admin@example.com", "newadminpass")
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] is None
assert user["reset_token_expiry"] is None
async def test_reset_password_fail_invalid_token(populated_user_service):
populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.reset_password("admin@example.com", "invalidtoken", "newadminpass") is False
assert populated_user_service.authenticate_user("admin@example.com", "adminpass") # Password should not change
async def test_reset_password_fail_nonexistent_user(user_service):
# Even if a token was somehow generated for a nonexistent user (which shouldn't happen),
# reset_password should fail.
assert user_service.reset_password("nonexistent@example.com", "anytoken", "newpass") is False
async def test_update_user_quota_success(populated_user_service):
populated_user_service.update_user_quota("admin@example.com", 20.5)
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["storage_quota_gb"] == 20.5
async def test_update_user_quota_nonexistent_user(user_service):
with pytest.raises(ValueError, match="User not found"):
user_service.update_user_quota("nonexistent@example.com", 100)