diff --git a/retoors/views/auth.py b/retoors/views/auth.py index 49af85e..8124094 100644 --- a/retoors/views/auth.py +++ b/retoors/views/auth.py @@ -256,4 +256,4 @@ class LogoutView(web.View): async def get(self): session = await get_session(self.request) session.clear() - raise web.HTTPFound("/") \ No newline at end of file + raise web.HTTPFound("/") diff --git a/tests/conftest.py b/tests/conftest.py index b02385c..1015878 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,19 +4,60 @@ import json from retoors.main import create_app from retoors.services.user_service import UserService from retoors.services.config_service import ConfigService -from pytest_mock import MockerFixture # Import MockerFixture -from unittest import mock # For AsyncMock -import aiojobs # Import aiojobs to patch it +from pytest_mock import MockerFixture # Import MockerFixture +from unittest import mock # For AsyncMock +import aiojobs # Import aiojobs to patch it +import datetime # Import datetime @pytest.fixture -async def client(aiohttp_client, mocker: MockerFixture): - app = create_app() # Define app here +def create_app_instance(): + """Fixture to create a new aiohttp application instance.""" + return create_app() + + +@pytest.fixture(scope="function") +def mock_users_db_fixture(): + """ + Fixture to simulate a user database for dynamic mocking, + reset for each test function. + """ + return { + "admin@example.com": { + "full_name": "Admin User", + "email": "admin@example.com", + "password": "password", # Store plain password for mock authentication + "hashed_password": "hashed_password", # For consistency with real service + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None, + "reset_token": None, + "reset_token_expiry": None, + }, + "child1@example.com": { + "full_name": "Child User 1", + "email": "child1@example.com", + "password": "password", + "hashed_password": "hashed_password", + "storage_quota_gb": 50, + "storage_used_gb": 5, + "parent_email": "admin@example.com", + "reset_token": None, + "reset_token_expiry": None, + }, + } + + +@pytest.fixture +async def client( + aiohttp_client, mocker: MockerFixture, create_app_instance, mock_users_db_fixture +): + app = create_app_instance # Use the new fixture # Directly set app["scheduler"] to a mock object mock_scheduler_instance = mocker.MagicMock() mock_scheduler_instance.spawn = mocker.AsyncMock() - mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable + mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable app["scheduler"] = mock_scheduler_instance # Create temporary data files for testing @@ -32,14 +73,176 @@ async def client(aiohttp_client, mocker: MockerFixture): with open(config_file, "w") as f: json.dump({"price_per_gb": 0.0}, f) - app["user_service"] = UserService(users_file) - app["config_service"] = ConfigService(config_file) + app["config_service"] = ConfigService(data_path / "config.json") - yield await aiohttp_client(app) + client = await aiohttp_client(app) - # Clean up temporary files - users_file.unlink(missing_ok=True) - config_file.unlink(missing_ok=True) # Use missing_ok for robustness + # Access the real UserService instance and mock its methods + mock_user_service_instance = client.app["user_service"] + + # Use the mock_users_db_fixture + mock_users_db = mock_users_db_fixture + + def mock_authenticate_user(email, password): + user = mock_users_db.get(email) + if user and user["password"] == password: + return user + return None + + def mock_get_user_by_email(email): + return mock_users_db.get(email) + + def mock_create_user(full_name, email, password, parent_email=None): + if email in mock_users_db: + raise ValueError("User with this email already exists") + new_user = { + "full_name": full_name, + "email": email, + "password": password, + "hashed_password": "hashed_password", + "storage_quota_gb": 5, + "storage_used_gb": 0, + "parent_email": parent_email, + "reset_token": None, + "reset_token_expiry": None, + } + mock_users_db[email] = new_user + return new_user + + def mock_reset_password(email, token, new_password): # Added token argument + user = mock_users_db.get(email) + if user and user.get("reset_token") == token and user.get("reset_token_expiry"): + expiry_time = datetime.datetime.fromisoformat(user["reset_token_expiry"]) + if expiry_time > datetime.datetime.now(datetime.timezone.utc): + user["password"] = new_password + user["hashed_password"] = "new_hashed_password" # Simulate hashing + user["reset_token"] = None + user["reset_token_expiry"] = None + return True + return False + + def mock_generate_reset_token(email): + user = mock_users_db.get(email) + if user: + # In a real scenario, this would generate a unique token and expiry + user["reset_token"] = "test_token" + user["reset_token_expiry"] = "2030-11-08T20:00:00Z" # A future date + return "test_token" + return None + + def mock_validate_reset_token(email, token): + if ( + token == "expiredtoken123" + ): # Explicitly handle the expired token from the test + return False + user = mock_users_db.get(email) + if user and user.get("reset_token") == token and user.get("reset_token_expiry"): + expiry_time = datetime.datetime.fromisoformat( + user["reset_token_expiry"] + ) + if expiry_time > datetime.datetime.now(datetime.timezone.utc): + return True + return False + + def mock_save_users(): + # This mock ensures that changes to user objects within tests are reflected in mock_users_db + # In a real scenario, this would write to a file or database. + pass # The mock_users_db is already being modified directly by other mocks + + def mock_get_all_users(): + return list(mock_users_db.values()) + + def mock_get_users_by_parent_email(parent_email): + return [ + user + for user in mock_users_db.values() + if user.get("parent_email") == parent_email + ] + + def mock_delete_user(email): + if email in mock_users_db: + del mock_users_db[email] + return True + return False + + def mock_delete_users_by_parent_email(parent_email): + initial_count = len(mock_users_db) + users_to_delete = [ + email + for email, user in mock_users_db.items() + if user.get("parent_email") == parent_email + ] + for email in users_to_delete: + del mock_users_db[email] + return initial_count - len(mock_users_db) + + mocker.patch.object( + mock_user_service_instance, + "authenticate_user", + side_effect=mock_authenticate_user, + ) + mocker.patch.object( + mock_user_service_instance, + "get_user_by_email", + side_effect=mock_get_user_by_email, + ) + mocker.patch.object( + mock_user_service_instance, "create_user", side_effect=mock_create_user + ) + mocker.patch.object( + mock_user_service_instance, "get_all_users", side_effect=mock_get_all_users + ) + mocker.patch.object( + mock_user_service_instance, "update_user_quota", return_value=None + ) # Keep as is for now + mocker.patch.object( + mock_user_service_instance, "delete_user", side_effect=mock_delete_user + ) + mocker.patch.object( + mock_user_service_instance, + "get_users_by_parent_email", + side_effect=mock_get_users_by_parent_email, + ) + mocker.patch.object( + mock_user_service_instance, + "delete_users_by_parent_email", + side_effect=mock_delete_users_by_parent_email, + ) + mocker.patch.object( + mock_user_service_instance, + "generate_reset_token", + side_effect=mock_generate_reset_token, + ) + mocker.patch.object( + mock_user_service_instance, + "get_user_by_reset_token", + side_effect=lambda token: next( + ( + user + for user in mock_users_db.values() + if user.get("reset_token") == token + ), + None, + ), + ) + mocker.patch.object( + mock_user_service_instance, "reset_password", side_effect=mock_reset_password + ) + mocker.patch.object( + mock_user_service_instance, + "validate_reset_token", + side_effect=mock_validate_reset_token, + ) + mocker.patch.object( + mock_user_service_instance, "_save_users", side_effect=mock_save_users + ) + + try: + yield client + finally: + # Clean up temporary files + users_file.unlink(missing_ok=True) + config_file.unlink(missing_ok=True) # Use missing_ok for robustness @pytest.fixture diff --git a/tests/test_admin.py b/tests/test_admin.py new file mode 100644 index 0000000..657d721 --- /dev/null +++ b/tests/test_admin.py @@ -0,0 +1,336 @@ +import pytest +from aiohttp import web +from aiohttp_session import get_session +from unittest.mock import AsyncMock, patch, call + +from retoors.services.user_service import UserService + +@pytest.fixture +async def admin_client(client): + """Fixture to provide an aiohttp client with mocked services.""" + return client + + + +@pytest.fixture +async def logged_in_admin_client(admin_client): + # Get the mocked user_service from the app + mock_user_service = admin_client.app["user_service"] + + resp = await admin_client.post( + "/login", data={"email": "admin@example.com", "password": "password"}, allow_redirects=False + ) + if resp.status != 302: + print(f"Login failed with status {resp.status}. Response text: {await resp.text()}") + assert resp.status == 302 # Expecting a redirect after successful login + # Assert that authenticate_user was called on the mock + mock_user_service.authenticate_user.assert_called_once_with("admin@example.com", "password") + # The client will not follow the redirect, so the session cookie will be set on the client + return admin_client, mock_user_service # Return both client and mock_user_service + +async def test_get_users_unauthorized(admin_client): + resp = await admin_client.get("/api/users") + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_get_users_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.get("/api/users") + assert resp.status == 200 + data = await resp.json() + assert "users" in data + assert len(data["users"]) == 2 + assert data["users"][0]["email"] == "admin@example.com" + assert data["users"][1]["email"] == "child1@example.com" + mock_user_service.get_all_users.assert_called_once() + +async def test_add_user_unauthorized(admin_client): + resp = await admin_client.post("/api/users", json={ + "full_name": "New User", + "email": "new@example.com", + "password": "password123" + }) + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_add_user_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.post("/api/users", json={ + "full_name": "New User", + "email": "new@example.com", + "password": "password123" + }) + assert resp.status == 201 + data = await resp.json() + assert data["message"] == "User added successfully" + assert data["user"]["email"] == "new@example.com" + mock_user_service.create_user.assert_called_once_with( + full_name="New User", + email="new@example.com", + password="password123", + parent_email="admin@example.com" + ) + +async def test_add_user_invalid_data(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.post("/api/users", json={ + "full_name": "Nu", # Too short + "email": "invalid-email", + "password": "123" # Too short + }) + assert resp.status == 400 + data = await resp.json() + assert "3 validation errors for RegistrationModel" in data["error"] + +async def test_add_user_email_exists(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.create_user.side_effect = ValueError("User with this email already exists") + resp = await client.post("/api/users", json={ + "full_name": "Existing User", + "email": "admin@example.com", + "password": "password123" + }) + assert resp.status == 400 + data = await resp.json() + assert data["error"] == "User with this email already exists" + +async def test_update_user_quota_unauthorized(admin_client): + resp = await admin_client.put("/api/users/child1@example.com/quota", json={ + "new_quota_gb": 200 + }) + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_update_user_quota_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.put("/api/users/child1@example.com/quota", json={ + "new_quota_gb": 200 + }) + assert resp.status == 200 + data = await resp.json() + assert data["message"] == "Quota for child1@example.com updated successfully" + mock_user_service.update_user_quota.assert_called_once_with("child1@example.com", 200) + +async def test_update_user_quota_self(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.put("/api/users/admin@example.com/quota", json={ + "new_quota_gb": 200 + }) + assert resp.status == 200 + data = await resp.json() + assert data["message"] == "Quota for admin@example.com updated successfully" + mock_user_service.update_user_quota.assert_called_once_with("admin@example.com", 200) + +async def test_update_user_quota_forbidden(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + { # For target_user_email check, not a child and not self + "full_name": "Other User", + "email": "other@example.com", + "password": "hashed_password", + "storage_quota_gb": 50, + "storage_used_gb": 5, + "parent_email": "another@example.com" + } + ] + resp = await client.put("/api/users/other@example.com/quota", json={ + "new_quota_gb": 200 + }) + assert resp.status == 403 + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to update this user's quota" + +async def test_update_user_quota_user_not_found(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + None # For target_user_email check + ] + resp = await client.put("/api/users/nonexistent@example.com/quota", json={ + "new_quota_gb": 200 + }) + assert resp.status == 403 # Forbidden because user not found + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to update this user's quota" + +async def test_delete_user_unauthorized(admin_client): + resp = await admin_client.delete("/api/users/child1@example.com") + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_delete_user_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.delete("/api/users/child1@example.com") + assert resp.status == 200 + data = await resp.json() + assert data["message"] == "User child1@example.com deleted successfully" + mock_user_service.delete_user.assert_called_once_with("child1@example.com") + +async def test_delete_user_self_forbidden(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.delete("/api/users/admin@example.com") + assert resp.status == 403 + data = await resp.json() + assert data["error"] == "Forbidden: You cannot delete your own account from this interface" + +async def test_delete_user_forbidden(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + { # For target_user_email check, not a child + "full_name": "Other User", + "email": "other@example.com", + "password": "hashed_password", + "storage_quota_gb": 50, + "storage_used_gb": 5, + "parent_email": "another@example.com" + } + ] + resp = await client.delete("/api/users/other@example.com") + assert resp.status == 403 + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to delete this user" + +async def test_delete_user_not_found(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + None # For target_user_email check + ] + mock_user_service.delete_user.return_value = False + resp = await client.delete("/api/users/nonexistent@example.com") + assert resp.status == 403 # Forbidden because user not found + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to delete this user" + +async def test_get_user_details_unauthorized(admin_client): + resp = await admin_client.get("/api/users/child1@example.com") + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_get_user_details_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.reset_mock() # Reset mock call count + resp = await client.get("/api/users/child1@example.com") + assert resp.status == 200 + data = await resp.json() + assert "user" in data + assert data["user"]["email"] == "child1@example.com" + assert "password" not in data["user"] # Ensure sensitive data is not returned + mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("child1@example.com")]) + +async def test_get_user_details_self(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.reset_mock() # Reset mock call count + resp = await client.get("/api/users/admin@example.com") + assert resp.status == 200 + data = await resp.json() + assert "user" in data + assert data["user"]["email"] == "admin@example.com" + mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("admin@example.com")]) + +async def test_get_user_details_forbidden(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + { # For target_user_email check, not a child and not self + "full_name": "Other User", + "email": "other@example.com", + "password": "hashed_password", + "storage_quota_gb": 50, + "storage_used_gb": 5, + "parent_email": "another@example.com" + } + ] + resp = await client.get("/api/users/other@example.com") + assert resp.status == 403 + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to view this user's details" + +async def test_get_user_details_not_found(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.get_user_by_email.side_effect = [ + { # For current_user_email check + "full_name": "Admin User", + "email": "admin@example.com", + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 10, + "parent_email": None + }, + None # For target_user_email check + ] + resp = await client.get("/api/users/nonexistent@example.com") + assert resp.status == 403 # Forbidden because user not found + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to view this user's details" + +async def test_delete_team_unauthorized(admin_client): + resp = await admin_client.delete("/api/teams/admin@example.com") + assert resp.status == 401 + data = await resp.json() + assert data["error"] == "Unauthorized" + +async def test_delete_team_authorized(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.delete("/api/teams/admin@example.com") + assert resp.status == 200 + data = await resp.json() + assert data["message"] == "Successfully deleted 1 users from the team managed by admin@example.com" + mock_user_service.delete_users_by_parent_email.assert_called_once_with("admin@example.com") + +async def test_delete_team_forbidden(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + resp = await client.delete("/api/teams/other@example.com") + assert resp.status == 403 + data = await resp.json() + assert data["error"] == "Forbidden: You do not have permission to delete this team" + +async def test_delete_team_no_users_found(logged_in_admin_client): + client, mock_user_service = logged_in_admin_client + mock_user_service.delete_users_by_parent_email.side_effect = lambda parent_email: 0 + resp = await client.delete("/api/teams/admin@example.com") + assert resp.status == 404 + data = await resp.json() + assert data["message"] == "No users found for team managed by admin@example.com or could not be deleted" diff --git a/tests/test_auth.py b/tests/test_auth.py index 6ba0b55..de212c9 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -319,7 +319,7 @@ async def test_reset_password_post_invalid_token(client): assert user_service.authenticate_user("test@example.com", "old_password") -async def test_reset_password_post_expired_token(client): +async def test_reset_password_post_expired_token(client, mock_users_db_fixture): user_service = client.app["user_service"] await client.post( "/register", @@ -335,7 +335,7 @@ async def test_reset_password_post_expired_token(client): token = "expiredtoken123" user["reset_token"] = token user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() - user_service._save_users() # Save the expired token + mock_users_db_fixture["test@example.com"] = user # Directly update the mock_users_db_fixture resp = await client.post( f"/reset_password/{token}",