This commit is contained in:
retoor 2025-11-08 22:24:11 +01:00
parent 30f8204e68
commit 2d6debe744
2 changed files with 287 additions and 0 deletions

97
tests/test_env_manager.py Normal file
View File

@ -0,0 +1,97 @@
import pytest
from pathlib import Path
import os
from unittest.mock import patch, MagicMock
from retoors.helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key
from cryptography.fernet import Fernet
@pytest.fixture
def mock_env_path(tmp_path):
"""Fixture to provide a temporary .env file path."""
return tmp_path / ".env"
@pytest.fixture(autouse=True)
def mock_dotenv_load():
"""Mock dotenv.load_dotenv to prevent actual file loading during tests."""
with patch('dotenv.load_dotenv') as mock_load:
yield mock_load
def test_ensure_env_file_exists_creates_file(mock_env_path):
"""Test that .env file is created if it doesn't exist."""
assert not mock_env_path.exists()
ensure_env_file_exists(mock_env_path)
assert mock_env_path.exists()
content = mock_env_path.read_text()
assert "PRICE_PER_GB=0.05" in content
assert "SMTP_HOST=localhost" in content
def test_ensure_env_file_exists_does_not_overwrite(mock_env_path):
"""Test that .env file is not overwritten if it already exists."""
mock_env_path.write_text("EXISTING_VAR=true\n")
ensure_env_file_exists(mock_env_path)
assert mock_env_path.exists()
content = mock_env_path.read_text()
assert "EXISTING_VAR=true" in content
# Ensure default content is not added if file already exists
assert "PRICE_PER_GB=0.05" not in content
@patch('os.getenv')
@patch('cryptography.fernet.Fernet.generate_key')
def test_get_or_create_session_secret_key_generates_new_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that a new key is generated and saved if not found."""
mock_getenv.return_value = None
# Use a dummy key for the mock's return value
mock_generate_key.return_value = b'dummy_generated_key_for_testing_1234567890'
ensure_env_file_exists(mock_env_path) # Ensure .env exists
key = get_or_create_session_secret_key(mock_env_path)
assert key == b'dummy_generated_key_for_testing_1234567890'
mock_generate_key.assert_called_once()
mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path)
content = mock_env_path.read_text()
assert f"SESSION_SECRET_KEY={b'dummy_generated_key_for_testing_1234567890'.decode('utf-8')}" in content
@patch('os.getenv')
def test_get_or_create_session_secret_key_uses_existing_valid_key(mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that an existing valid key is used."""
valid_key = Fernet.generate_key()
mock_getenv.return_value = valid_key.decode('utf-8')
ensure_env_file_exists(mock_env_path) # Ensure .env exists
# Write the valid key to the mock .env file for the function to "find" it
with open(mock_env_path, 'a') as f:
f.write(f'\nSESSION_SECRET_KEY={valid_key.decode("utf-8")}\n')
key = get_or_create_session_secret_key(mock_env_path)
assert key == valid_key
mock_dotenv_load.assert_not_called() # Should not reload if key is found and valid
mock_getenv.assert_called_once_with('SESSION_SECRET_KEY')
@patch('os.getenv')
@patch('cryptography.fernet.Fernet.generate_key')
def test_get_or_create_session_secret_key_generates_on_invalid_existing_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that a new key is generated if the existing one is invalid."""
invalid_key_str = "this-is-an-invalid-key"
mock_getenv.return_value = invalid_key_str
# Use a dummy key for the mock's return value
mock_generate_key.return_value = b'another_dummy_key_for_testing_0987654321'
ensure_env_file_exists(mock_env_path) # Ensure .env exists
# Write the invalid key to the mock .env file
with open(mock_env_path, 'a') as f:
f.write(f'\nSESSION_SECRET_KEY={invalid_key_str}\n')
key = get_or_create_session_secret_key(mock_env_path)
assert key == b'another_dummy_key_for_testing_0987654321'
mock_generate_key.assert_called_once()
mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path)
content = mock_env_path.read_text()
assert f"SESSION_SECRET_KEY={b'another_dummy_key_for_testing_0987654321'.decode('utf-8')}" in content
# Ensure the invalid key is effectively replaced or a new one appended
# The current implementation appends, so we check for both
assert invalid_key_str in content

190
tests/test_user_service.py Normal file
View File

@ -0,0 +1,190 @@
import pytest
from pathlib import Path
import json
from retoors.services.user_service import UserService
import bcrypt
import datetime
@pytest.fixture
def users_file(tmp_path):
"""Fixture to create a temporary users.json file for testing."""
file = tmp_path / "users.json"
with open(file, "w") as f:
json.dump([], f) # Start with an empty list of users
return file
@pytest.fixture
def user_service(users_file):
"""Fixture to provide a UserService instance with a temporary users.json."""
return UserService(users_file)
@pytest.fixture
def populated_user_service(user_service):
"""Fixture to provide a UserService instance with some pre-populated users."""
user_service.create_user("Admin User", "admin@example.com", "adminpass")
user_service.create_user("Parent User", "parent@example.com", "parentpass")
user_service.create_user("Child User 1", "child1@example.com", "childpass", "parent@example.com")
user_service.create_user("Child User 2", "child2@example.com", "childpass", "parent@example.com")
return user_service
async def test_create_user_success(user_service):
user = user_service.create_user("Test User", "test@example.com", "password123")
assert user is not None
assert user["email"] == "test@example.com"
assert user_service.get_user_by_email("test@example.com") is not None
assert bcrypt.checkpw(b"password123", user["password"].encode('utf-8'))
async def test_create_user_duplicate_email(user_service):
user_service.create_user("Test User", "test@example.com", "password123")
with pytest.raises(ValueError, match="User with this email already exists"):
user_service.create_user("Another User", "test@example.com", "anotherpass")
async def test_get_all_users(populated_user_service):
users = populated_user_service.get_all_users()
assert len(users) == 4
emails = {user["email"] for user in users}
assert "admin@example.com" in emails
assert "parent@example.com" in emails
assert "child1@example.com" in emails
assert "child2@example.com" in emails
async def test_get_users_by_parent_email(populated_user_service):
children = populated_user_service.get_users_by_parent_email("parent@example.com")
assert len(children) == 2
child_emails = {user["email"] for user in children}
assert "child1@example.com" in child_emails
assert "child2@example.com" in child_emails
no_children = populated_user_service.get_users_by_parent_email("nonexistent@example.com")
assert len(no_children) == 0
admin_children = populated_user_service.get_users_by_parent_email("admin@example.com")
assert len(admin_children) == 0
async def test_update_user_non_password_fields(populated_user_service):
updated_user = populated_user_service.update_user("admin@example.com", full_name="Administrator", storage_quota_gb=10)
assert updated_user is not None
assert updated_user["full_name"] == "Administrator"
assert updated_user["storage_quota_gb"] == 10
retrieved_user = populated_user_service.get_user_by_email("admin@example.com")
assert retrieved_user["full_name"] == "Administrator"
assert retrieved_user["storage_quota_gb"] == 10
async def test_update_user_password(populated_user_service):
updated_user = populated_user_service.update_user("admin@example.com", password="newadminpass")
assert updated_user is not None
assert populated_user_service.authenticate_user("admin@example.com", "newadminpass")
assert not populated_user_service.authenticate_user("admin@example.com", "adminpass")
async def test_update_user_nonexistent(user_service):
updated_user = user_service.update_user("nonexistent@example.com", full_name="Non Existent")
assert updated_user is None
async def test_delete_user_success(populated_user_service):
assert populated_user_service.delete_user("admin@example.com") is True
assert populated_user_service.get_user_by_email("admin@example.com") is None
assert len(populated_user_service.get_all_users()) == 3
async def test_delete_user_nonexistent(user_service):
assert user_service.delete_user("nonexistent@example.com") is False
async def test_delete_users_by_parent_email_success(populated_user_service):
deleted_count = populated_user_service.delete_users_by_parent_email("parent@example.com")
assert deleted_count == 2
assert populated_user_service.get_user_by_email("child1@example.com") is None
assert populated_user_service.get_user_by_email("child2@example.com") is None
assert len(populated_user_service.get_all_users()) == 2 # Admin and Parent users remain
async def test_delete_users_by_parent_email_no_match(user_service):
deleted_count = user_service.delete_users_by_parent_email("nonexistent@example.com")
assert deleted_count == 0
async def test_authenticate_user_success(populated_user_service):
assert populated_user_service.authenticate_user("admin@example.com", "adminpass") is True
async def test_authenticate_user_fail_wrong_password(populated_user_service):
assert populated_user_service.authenticate_user("admin@example.com", "wrongpass") is False
async def test_authenticate_user_fail_nonexistent_user(user_service):
assert user_service.authenticate_user("nonexistent@example.com", "anypass") is False
async def test_generate_reset_token_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert token is not None
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] == token
assert user["reset_token_expiry"] is not None
# Check expiry is in the future
expiry_dt = datetime.datetime.fromisoformat(user["reset_token_expiry"])
assert expiry_dt > datetime.datetime.now(datetime.timezone.utc)
async def test_generate_reset_token_nonexistent_user(user_service):
token = user_service.generate_reset_token("nonexistent@example.com")
assert token is None
async def test_get_user_by_reset_token_valid(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_reset_token(token)
assert user is not None
assert user["email"] == "admin@example.com"
async def test_get_user_by_reset_token_invalid(populated_user_service):
user = populated_user_service.get_user_by_reset_token("invalidtoken")
assert user is None
async def test_get_user_by_reset_token_expired(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_email("admin@example.com")
# Manually expire the token
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
populated_user_service._save_users() # Save the expired state
user_after_expiry = populated_user_service.get_user_by_reset_token(token)
assert user_after_expiry is None
async def test_validate_reset_token_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.validate_reset_token("admin@example.com", token) is True
async def test_validate_reset_token_fail_wrong_token(populated_user_service):
populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.validate_reset_token("admin@example.com", "wrongtoken") is False
async def test_validate_reset_token_fail_nonexistent_user(user_service):
assert user_service.validate_reset_token("nonexistent@example.com", "anytoken") is False
async def test_validate_reset_token_fail_expired_token(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
user = populated_user_service.get_user_by_email("admin@example.com")
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
populated_user_service._save_users()
assert populated_user_service.validate_reset_token("admin@example.com", token) is False
async def test_reset_password_success(populated_user_service):
token = populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.reset_password("admin@example.com", token, "newadminpass") is True
assert populated_user_service.authenticate_user("admin@example.com", "newadminpass")
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] is None
assert user["reset_token_expiry"] is None
async def test_reset_password_fail_invalid_token(populated_user_service):
populated_user_service.generate_reset_token("admin@example.com")
assert populated_user_service.reset_password("admin@example.com", "invalidtoken", "newadminpass") is False
assert populated_user_service.authenticate_user("admin@example.com", "adminpass") # Password should not change
async def test_reset_password_fail_nonexistent_user(user_service):
# Even if a token was somehow generated for a nonexistent user (which shouldn't happen),
# reset_password should fail.
assert user_service.reset_password("nonexistent@example.com", "anytoken", "newpass") is False
async def test_update_user_quota_success(populated_user_service):
populated_user_service.update_user_quota("admin@example.com", 20.5)
user = populated_user_service.get_user_by_email("admin@example.com")
assert user["storage_quota_gb"] == 20.5
async def test_update_user_quota_nonexistent_user(user_service):
with pytest.raises(ValueError, match="User not found"):
user_service.update_user_quota("nonexistent@example.com", 100)