diff --git a/tests/test_env_manager.py b/tests/test_env_manager.py new file mode 100644 index 0000000..4017e8a --- /dev/null +++ b/tests/test_env_manager.py @@ -0,0 +1,97 @@ +import pytest +from pathlib import Path +import os +from unittest.mock import patch, MagicMock +from retoors.helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key +from cryptography.fernet import Fernet + +@pytest.fixture +def mock_env_path(tmp_path): + """Fixture to provide a temporary .env file path.""" + return tmp_path / ".env" + +@pytest.fixture(autouse=True) +def mock_dotenv_load(): + """Mock dotenv.load_dotenv to prevent actual file loading during tests.""" + with patch('dotenv.load_dotenv') as mock_load: + yield mock_load + +def test_ensure_env_file_exists_creates_file(mock_env_path): + """Test that .env file is created if it doesn't exist.""" + assert not mock_env_path.exists() + ensure_env_file_exists(mock_env_path) + assert mock_env_path.exists() + content = mock_env_path.read_text() + assert "PRICE_PER_GB=0.05" in content + assert "SMTP_HOST=localhost" in content + +def test_ensure_env_file_exists_does_not_overwrite(mock_env_path): + """Test that .env file is not overwritten if it already exists.""" + mock_env_path.write_text("EXISTING_VAR=true\n") + ensure_env_file_exists(mock_env_path) + assert mock_env_path.exists() + content = mock_env_path.read_text() + assert "EXISTING_VAR=true" in content + # Ensure default content is not added if file already exists + assert "PRICE_PER_GB=0.05" not in content + +@patch('os.getenv') +@patch('cryptography.fernet.Fernet.generate_key') +def test_get_or_create_session_secret_key_generates_new_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load): + """Test that a new key is generated and saved if not found.""" + mock_getenv.return_value = None + # Use a dummy key for the mock's return value + mock_generate_key.return_value = b'dummy_generated_key_for_testing_1234567890' + + ensure_env_file_exists(mock_env_path) # Ensure .env exists + key = get_or_create_session_secret_key(mock_env_path) + + assert key == b'dummy_generated_key_for_testing_1234567890' + mock_generate_key.assert_called_once() + mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path) + + content = mock_env_path.read_text() + assert f"SESSION_SECRET_KEY={b'dummy_generated_key_for_testing_1234567890'.decode('utf-8')}" in content + +@patch('os.getenv') +def test_get_or_create_session_secret_key_uses_existing_valid_key(mock_getenv, mock_env_path, mock_dotenv_load): + """Test that an existing valid key is used.""" + valid_key = Fernet.generate_key() + mock_getenv.return_value = valid_key.decode('utf-8') + + ensure_env_file_exists(mock_env_path) # Ensure .env exists + # Write the valid key to the mock .env file for the function to "find" it + with open(mock_env_path, 'a') as f: + f.write(f'\nSESSION_SECRET_KEY={valid_key.decode("utf-8")}\n') + + key = get_or_create_session_secret_key(mock_env_path) + + assert key == valid_key + mock_dotenv_load.assert_not_called() # Should not reload if key is found and valid + mock_getenv.assert_called_once_with('SESSION_SECRET_KEY') + +@patch('os.getenv') +@patch('cryptography.fernet.Fernet.generate_key') +def test_get_or_create_session_secret_key_generates_on_invalid_existing_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load): + """Test that a new key is generated if the existing one is invalid.""" + invalid_key_str = "this-is-an-invalid-key" + mock_getenv.return_value = invalid_key_str + # Use a dummy key for the mock's return value + mock_generate_key.return_value = b'another_dummy_key_for_testing_0987654321' + + ensure_env_file_exists(mock_env_path) # Ensure .env exists + # Write the invalid key to the mock .env file + with open(mock_env_path, 'a') as f: + f.write(f'\nSESSION_SECRET_KEY={invalid_key_str}\n') + + key = get_or_create_session_secret_key(mock_env_path) + + assert key == b'another_dummy_key_for_testing_0987654321' + mock_generate_key.assert_called_once() + mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path) + + content = mock_env_path.read_text() + assert f"SESSION_SECRET_KEY={b'another_dummy_key_for_testing_0987654321'.decode('utf-8')}" in content + # Ensure the invalid key is effectively replaced or a new one appended + # The current implementation appends, so we check for both + assert invalid_key_str in content diff --git a/tests/test_user_service.py b/tests/test_user_service.py new file mode 100644 index 0000000..2e6e027 --- /dev/null +++ b/tests/test_user_service.py @@ -0,0 +1,190 @@ +import pytest +from pathlib import Path +import json +from retoors.services.user_service import UserService +import bcrypt +import datetime + +@pytest.fixture +def users_file(tmp_path): + """Fixture to create a temporary users.json file for testing.""" + file = tmp_path / "users.json" + with open(file, "w") as f: + json.dump([], f) # Start with an empty list of users + return file + +@pytest.fixture +def user_service(users_file): + """Fixture to provide a UserService instance with a temporary users.json.""" + return UserService(users_file) + +@pytest.fixture +def populated_user_service(user_service): + """Fixture to provide a UserService instance with some pre-populated users.""" + user_service.create_user("Admin User", "admin@example.com", "adminpass") + user_service.create_user("Parent User", "parent@example.com", "parentpass") + user_service.create_user("Child User 1", "child1@example.com", "childpass", "parent@example.com") + user_service.create_user("Child User 2", "child2@example.com", "childpass", "parent@example.com") + return user_service + +async def test_create_user_success(user_service): + user = user_service.create_user("Test User", "test@example.com", "password123") + assert user is not None + assert user["email"] == "test@example.com" + assert user_service.get_user_by_email("test@example.com") is not None + assert bcrypt.checkpw(b"password123", user["password"].encode('utf-8')) + +async def test_create_user_duplicate_email(user_service): + user_service.create_user("Test User", "test@example.com", "password123") + with pytest.raises(ValueError, match="User with this email already exists"): + user_service.create_user("Another User", "test@example.com", "anotherpass") + +async def test_get_all_users(populated_user_service): + users = populated_user_service.get_all_users() + assert len(users) == 4 + emails = {user["email"] for user in users} + assert "admin@example.com" in emails + assert "parent@example.com" in emails + assert "child1@example.com" in emails + assert "child2@example.com" in emails + +async def test_get_users_by_parent_email(populated_user_service): + children = populated_user_service.get_users_by_parent_email("parent@example.com") + assert len(children) == 2 + child_emails = {user["email"] for user in children} + assert "child1@example.com" in child_emails + assert "child2@example.com" in child_emails + + no_children = populated_user_service.get_users_by_parent_email("nonexistent@example.com") + assert len(no_children) == 0 + + admin_children = populated_user_service.get_users_by_parent_email("admin@example.com") + assert len(admin_children) == 0 + +async def test_update_user_non_password_fields(populated_user_service): + updated_user = populated_user_service.update_user("admin@example.com", full_name="Administrator", storage_quota_gb=10) + assert updated_user is not None + assert updated_user["full_name"] == "Administrator" + assert updated_user["storage_quota_gb"] == 10 + + retrieved_user = populated_user_service.get_user_by_email("admin@example.com") + assert retrieved_user["full_name"] == "Administrator" + assert retrieved_user["storage_quota_gb"] == 10 + +async def test_update_user_password(populated_user_service): + updated_user = populated_user_service.update_user("admin@example.com", password="newadminpass") + assert updated_user is not None + assert populated_user_service.authenticate_user("admin@example.com", "newadminpass") + assert not populated_user_service.authenticate_user("admin@example.com", "adminpass") + +async def test_update_user_nonexistent(user_service): + updated_user = user_service.update_user("nonexistent@example.com", full_name="Non Existent") + assert updated_user is None + +async def test_delete_user_success(populated_user_service): + assert populated_user_service.delete_user("admin@example.com") is True + assert populated_user_service.get_user_by_email("admin@example.com") is None + assert len(populated_user_service.get_all_users()) == 3 + +async def test_delete_user_nonexistent(user_service): + assert user_service.delete_user("nonexistent@example.com") is False + +async def test_delete_users_by_parent_email_success(populated_user_service): + deleted_count = populated_user_service.delete_users_by_parent_email("parent@example.com") + assert deleted_count == 2 + assert populated_user_service.get_user_by_email("child1@example.com") is None + assert populated_user_service.get_user_by_email("child2@example.com") is None + assert len(populated_user_service.get_all_users()) == 2 # Admin and Parent users remain + +async def test_delete_users_by_parent_email_no_match(user_service): + deleted_count = user_service.delete_users_by_parent_email("nonexistent@example.com") + assert deleted_count == 0 + +async def test_authenticate_user_success(populated_user_service): + assert populated_user_service.authenticate_user("admin@example.com", "adminpass") is True + +async def test_authenticate_user_fail_wrong_password(populated_user_service): + assert populated_user_service.authenticate_user("admin@example.com", "wrongpass") is False + +async def test_authenticate_user_fail_nonexistent_user(user_service): + assert user_service.authenticate_user("nonexistent@example.com", "anypass") is False + +async def test_generate_reset_token_success(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + assert token is not None + user = populated_user_service.get_user_by_email("admin@example.com") + assert user["reset_token"] == token + assert user["reset_token_expiry"] is not None + # Check expiry is in the future + expiry_dt = datetime.datetime.fromisoformat(user["reset_token_expiry"]) + assert expiry_dt > datetime.datetime.now(datetime.timezone.utc) + +async def test_generate_reset_token_nonexistent_user(user_service): + token = user_service.generate_reset_token("nonexistent@example.com") + assert token is None + +async def test_get_user_by_reset_token_valid(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + user = populated_user_service.get_user_by_reset_token(token) + assert user is not None + assert user["email"] == "admin@example.com" + +async def test_get_user_by_reset_token_invalid(populated_user_service): + user = populated_user_service.get_user_by_reset_token("invalidtoken") + assert user is None + +async def test_get_user_by_reset_token_expired(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + user = populated_user_service.get_user_by_email("admin@example.com") + # Manually expire the token + user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() + populated_user_service._save_users() # Save the expired state + + user_after_expiry = populated_user_service.get_user_by_reset_token(token) + assert user_after_expiry is None + +async def test_validate_reset_token_success(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + assert populated_user_service.validate_reset_token("admin@example.com", token) is True + +async def test_validate_reset_token_fail_wrong_token(populated_user_service): + populated_user_service.generate_reset_token("admin@example.com") + assert populated_user_service.validate_reset_token("admin@example.com", "wrongtoken") is False + +async def test_validate_reset_token_fail_nonexistent_user(user_service): + assert user_service.validate_reset_token("nonexistent@example.com", "anytoken") is False + +async def test_validate_reset_token_fail_expired_token(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + user = populated_user_service.get_user_by_email("admin@example.com") + user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() + populated_user_service._save_users() + + assert populated_user_service.validate_reset_token("admin@example.com", token) is False + +async def test_reset_password_success(populated_user_service): + token = populated_user_service.generate_reset_token("admin@example.com") + assert populated_user_service.reset_password("admin@example.com", token, "newadminpass") is True + assert populated_user_service.authenticate_user("admin@example.com", "newadminpass") + user = populated_user_service.get_user_by_email("admin@example.com") + assert user["reset_token"] is None + assert user["reset_token_expiry"] is None + +async def test_reset_password_fail_invalid_token(populated_user_service): + populated_user_service.generate_reset_token("admin@example.com") + assert populated_user_service.reset_password("admin@example.com", "invalidtoken", "newadminpass") is False + assert populated_user_service.authenticate_user("admin@example.com", "adminpass") # Password should not change + +async def test_reset_password_fail_nonexistent_user(user_service): + # Even if a token was somehow generated for a nonexistent user (which shouldn't happen), + # reset_password should fail. + assert user_service.reset_password("nonexistent@example.com", "anytoken", "newpass") is False + +async def test_update_user_quota_success(populated_user_service): + populated_user_service.update_user_quota("admin@example.com", 20.5) + user = populated_user_service.get_user_by_email("admin@example.com") + assert user["storage_quota_gb"] == 20.5 + +async def test_update_user_quota_nonexistent_user(user_service): + with pytest.raises(ValueError, match="User not found"): + user_service.update_user_quota("nonexistent@example.com", 100)