From 925f91a17c3419904808547a1e33394b934b0624 Mon Sep 17 00:00:00 2001 From: retoor Date: Sun, 9 Nov 2025 10:02:10 +0100 Subject: [PATCH] Update. --- tests/conftest.py | 195 +---------- tests/test_admin.py | 333 ------------------- tests/test_auth.py | 377 --------------------- tests/test_dashboard.py | 21 ++ tests/test_env_manager.py | 95 ------ tests/test_file_browser.py | 606 ---------------------------------- tests/test_files.py | 102 ++++++ tests/test_login.py | 15 + tests/test_logout.py | 17 + tests/test_registration.py | 21 ++ tests/test_site.py | 196 ----------- tests/test_storage_service.py | 183 ---------- tests/test_user_service.py | 189 ----------- 13 files changed, 177 insertions(+), 2173 deletions(-) delete mode 100644 tests/test_admin.py delete mode 100644 tests/test_auth.py create mode 100644 tests/test_dashboard.py delete mode 100644 tests/test_env_manager.py delete mode 100644 tests/test_file_browser.py create mode 100644 tests/test_files.py create mode 100644 tests/test_login.py create mode 100644 tests/test_logout.py create mode 100644 tests/test_registration.py delete mode 100644 tests/test_site.py delete mode 100644 tests/test_storage_service.py delete mode 100644 tests/test_user_service.py diff --git a/tests/conftest.py b/tests/conftest.py index 3b07013..27997fa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,194 +1 @@ -import pytest -from pathlib import Path -import json -from retoors.main import create_app -from retoors.services.config_service import ConfigService -from pytest_mock import MockerFixture # Import MockerFixture -import datetime # Import datetime -from aiohttp.test_utils import TestClient # Import TestClient -from aiohttp_session import setup as setup_session # Import setup_session -from aiohttp_session.cookie_storage import EncryptedCookieStorage # Import EncryptedCookieStorage -from retoors.helpers.env_manager import get_or_create_session_secret_key # Import get_or_create_session_secret_key -from retoors.middlewares import user_middleware, error_middleware # Import middlewares -from retoors.services.user_service import UserService # Import UserService -from retoors.routes import setup_routes # Import setup_routes -import aiohttp_jinja2 # Import aiohttp_jinja2 -import jinja2 # Import jinja2 -import aiohttp # Import aiohttp -from retoors.services.file_service import FileService # Import FileService - - -@pytest.fixture -def temp_user_files_dir(tmp_path): - """Fixture to create a temporary directory for user files.""" - user_files_dir = tmp_path / "user_files" - user_files_dir.mkdir() - return user_files_dir - -@pytest.fixture -def temp_users_json(tmp_path): - """Fixture to create a temporary users.json file.""" - users_json_path = tmp_path / "users.json" - initial_users_data = [ - { - "email": "test@example.com", - "full_name": "Test User", - "password": "hashed_password", - "storage_quota_gb": 10, - "storage_used_gb": 0, - "parent_email": None, - "shared_items": {} - }, - { - "email": "child@example.com", - "full_name": "Child User", - "email": "child@example.com", - "password": "hashed_password", - "storage_quota_gb": 5, - "storage_used_gb": 0, - "parent_email": "test@example.com", - "shared_items": {} - } - ] - with open(users_json_path, "w") as f: - json.dump(initial_users_data, f) - return users_json_path - -@pytest.fixture -def file_service_instance(temp_user_files_dir, temp_users_json): - """Fixture to provide a FileService instance with temporary directories.""" - user_service = UserService(temp_users_json) # Create a UserService instance - return FileService(temp_user_files_dir, user_service) # Pass the UserService instance - - - - -@pytest.fixture -def create_test_app(mocker, temp_user_files_dir, temp_users_json): - """Fixture to create a test aiohttp application with mocked services.""" - from aiohttp import web - - app = web.Application() - - # Setup session for the test app - project_root = Path(__file__).parent.parent - env_file_path = project_root / ".env" - secret_key = get_or_create_session_secret_key(env_file_path) - setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8"))) - - app.middlewares.append(error_middleware) - app.middlewares.append(user_middleware) - - # Use a real UserService with a temporary users.json for the app - app["user_service"] = UserService(temp_users_json) - - # Mock scheduler - mock_scheduler = mocker.MagicMock() - mock_scheduler.spawn = mocker.AsyncMock() - mock_scheduler.close = mocker.AsyncMock() - - app["file_service"] = FileService(temp_user_files_dir, app["user_service"]) - app["scheduler"] = mock_scheduler - - # Setup Jinja2 for templates - base_path = Path(__file__).parent.parent / "retoors" - templates_path = base_path / "templates" - aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path))) - - setup_routes(app) - return app - - - - - -@pytest.fixture -async def client( - aiohttp_client, mocker: MockerFixture, create_test_app -): - app = create_test_app # Use create_test_app for consistent test environment - - # Directly set app["scheduler"] to a mock object - mock_scheduler_instance = mocker.MagicMock() - mock_scheduler_instance.spawn = mocker.AsyncMock() - mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable - app["scheduler"] = mock_scheduler_instance - - # The UserService and ConfigService are now set up in create_test_app with temporary files. - # No need to manually create users.json or config.json here. - - client = await aiohttp_client(app) - - # The UserService is now a real instance, so we don't need to mock its methods here. - # The mock_users_db_fixture is also no longer needed in this context. - - try: - yield client - finally: - # Clean up temporary files created by create_test_app if necessary, - # but tmp_path usually handles this. - pass - - -@pytest.fixture -async def logged_in_client(aiohttp_client, create_test_app, mocker): - """Fixture to provide an aiohttp client with a logged-in user.""" - app = create_test_app - client = await aiohttp_client(app) - - user_service = app["user_service"] - - # The UserService is now a real instance, so we don't need to mock its methods here. - # The create_user, authenticate_user, and get_user_by_email methods will interact - # with the real UserService instance. - - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - - return client - -@pytest.fixture -async def logged_in_admin_client(aiohttp_client, create_test_app, mocker): - """Fixture to provide an aiohttp client with a logged-in admin user.""" - app = create_test_app - client = await aiohttp_client(app) - - user_service = app["user_service"] - - # The UserService is now a real instance, so we don't need to mock its methods here. - # The create_user, authenticate_user, and get_user_by_email methods will interact - # with the real UserService instance. - - await client.post( - "/register", - data={ - "full_name": "Admin User", - "email": "admin@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "admin@example.com", "password": "password"} - ) - return client - - -@pytest.fixture -def mock_send_email(mocker: MockerFixture): - """ - Fixture to mock the send_email function. - This fixture will return the mock that was patched globally by the client fixture. - """ - # Access the globally patched mock - return mocker.patch("retoors.helpers.email_sender.send_email") +# conftest.py for pytest-playwright \ No newline at end of file diff --git a/tests/test_admin.py b/tests/test_admin.py deleted file mode 100644 index f543b9f..0000000 --- a/tests/test_admin.py +++ /dev/null @@ -1,333 +0,0 @@ -import pytest -from unittest.mock import call - - -@pytest.fixture -async def admin_client(client): - """Fixture to provide an aiohttp client with mocked services.""" - return client - - - -@pytest.fixture -async def logged_in_admin_client(admin_client): - # Get the mocked user_service from the app - mock_user_service = admin_client.app["user_service"] - - resp = await admin_client.post( - "/login", data={"email": "admin@example.com", "password": "password"}, allow_redirects=False - ) - if resp.status != 302: - print(f"Login failed with status {resp.status}. Response text: {await resp.text()}") - assert resp.status == 302 # Expecting a redirect after successful login - # Assert that authenticate_user was called on the mock - mock_user_service.authenticate_user.assert_called_once_with("admin@example.com", "password") - # The client will not follow the redirect, so the session cookie will be set on the client - return admin_client, mock_user_service # Return both client and mock_user_service - -async def test_get_users_unauthorized(admin_client): - resp = await admin_client.get("/api/users") - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_get_users_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.get("/api/users") - assert resp.status == 200 - data = await resp.json() - assert "users" in data - assert len(data["users"]) == 2 - assert data["users"][0]["email"] == "admin@example.com" - assert data["users"][1]["email"] == "child1@example.com" - mock_user_service.get_all_users.assert_called_once() - -async def test_add_user_unauthorized(admin_client): - resp = await admin_client.post("/api/users", json={ - "full_name": "New User", - "email": "new@example.com", - "password": "password123" - }) - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_add_user_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.post("/api/users", json={ - "full_name": "New User", - "email": "new@example.com", - "password": "password123" - }) - assert resp.status == 201 - data = await resp.json() - assert data["message"] == "User added successfully" - assert data["user"]["email"] == "new@example.com" - mock_user_service.create_user.assert_called_once_with( - full_name="New User", - email="new@example.com", - password="password123", - parent_email="admin@example.com" - ) - -async def test_add_user_invalid_data(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.post("/api/users", json={ - "full_name": "Nu", # Too short - "email": "invalid-email", - "password": "123" # Too short - }) - assert resp.status == 400 - data = await resp.json() - assert "3 validation errors for RegistrationModel" in data["error"] - -async def test_add_user_email_exists(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.create_user.side_effect = ValueError("User with this email already exists") - resp = await client.post("/api/users", json={ - "full_name": "Existing User", - "email": "admin@example.com", - "password": "password123" - }) - assert resp.status == 400 - data = await resp.json() - assert data["error"] == "User with this email already exists" - -async def test_update_user_quota_unauthorized(admin_client): - resp = await admin_client.put("/api/users/child1@example.com/quota", json={ - "new_quota_gb": 200 - }) - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_update_user_quota_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.put("/api/users/child1@example.com/quota", json={ - "new_quota_gb": 200 - }) - assert resp.status == 200 - data = await resp.json() - assert data["message"] == "Quota for child1@example.com updated successfully" - mock_user_service.update_user_quota.assert_called_once_with("child1@example.com", 200) - -async def test_update_user_quota_self(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.put("/api/users/admin@example.com/quota", json={ - "new_quota_gb": 200 - }) - assert resp.status == 200 - data = await resp.json() - assert data["message"] == "Quota for admin@example.com updated successfully" - mock_user_service.update_user_quota.assert_called_once_with("admin@example.com", 200) - -async def test_update_user_quota_forbidden(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - { # For target_user_email check, not a child and not self - "full_name": "Other User", - "email": "other@example.com", - "password": "hashed_password", - "storage_quota_gb": 50, - "storage_used_gb": 5, - "parent_email": "another@example.com" - } - ] - resp = await client.put("/api/users/other@example.com/quota", json={ - "new_quota_gb": 200 - }) - assert resp.status == 403 - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to update this user's quota" - -async def test_update_user_quota_user_not_found(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - None # For target_user_email check - ] - resp = await client.put("/api/users/nonexistent@example.com/quota", json={ - "new_quota_gb": 200 - }) - assert resp.status == 403 # Forbidden because user not found - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to update this user's quota" - -async def test_delete_user_unauthorized(admin_client): - resp = await admin_client.delete("/api/users/child1@example.com") - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_delete_user_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.delete("/api/users/child1@example.com") - assert resp.status == 200 - data = await resp.json() - assert data["message"] == "User child1@example.com deleted successfully" - mock_user_service.delete_user.assert_called_once_with("child1@example.com") - -async def test_delete_user_self_forbidden(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.delete("/api/users/admin@example.com") - assert resp.status == 403 - data = await resp.json() - assert data["error"] == "Forbidden: You cannot delete your own account from this interface" - -async def test_delete_user_forbidden(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - { # For target_user_email check, not a child - "full_name": "Other User", - "email": "other@example.com", - "password": "hashed_password", - "storage_quota_gb": 50, - "storage_used_gb": 5, - "parent_email": "another@example.com" - } - ] - resp = await client.delete("/api/users/other@example.com") - assert resp.status == 403 - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to delete this user" - -async def test_delete_user_not_found(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - None # For target_user_email check - ] - mock_user_service.delete_user.return_value = False - resp = await client.delete("/api/users/nonexistent@example.com") - assert resp.status == 403 # Forbidden because user not found - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to delete this user" - -async def test_get_user_details_unauthorized(admin_client): - resp = await admin_client.get("/api/users/child1@example.com") - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_get_user_details_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.reset_mock() # Reset mock call count - resp = await client.get("/api/users/child1@example.com") - assert resp.status == 200 - data = await resp.json() - assert "user" in data - assert data["user"]["email"] == "child1@example.com" - assert "password" not in data["user"] # Ensure sensitive data is not returned - mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("child1@example.com")]) - -async def test_get_user_details_self(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.reset_mock() # Reset mock call count - resp = await client.get("/api/users/admin@example.com") - assert resp.status == 200 - data = await resp.json() - assert "user" in data - assert data["user"]["email"] == "admin@example.com" - mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("admin@example.com")]) - -async def test_get_user_details_forbidden(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - { # For target_user_email check, not a child and not self - "full_name": "Other User", - "email": "other@example.com", - "password": "hashed_password", - "storage_quota_gb": 50, - "storage_used_gb": 5, - "parent_email": "another@example.com" - } - ] - resp = await client.get("/api/users/other@example.com") - assert resp.status == 403 - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to view this user's details" - -async def test_get_user_details_not_found(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.get_user_by_email.side_effect = [ - { # For current_user_email check - "full_name": "Admin User", - "email": "admin@example.com", - "password": "hashed_password", - "storage_quota_gb": 100, - "storage_used_gb": 10, - "parent_email": None - }, - None # For target_user_email check - ] - resp = await client.get("/api/users/nonexistent@example.com") - assert resp.status == 403 # Forbidden because user not found - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to view this user's details" - -async def test_delete_team_unauthorized(admin_client): - resp = await admin_client.delete("/api/teams/admin@example.com") - assert resp.status == 401 - data = await resp.json() - assert data["error"] == "Unauthorized" - -async def test_delete_team_authorized(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.delete("/api/teams/admin@example.com") - assert resp.status == 200 - data = await resp.json() - assert data["message"] == "Successfully deleted 1 users from the team managed by admin@example.com" - mock_user_service.delete_users_by_parent_email.assert_called_once_with("admin@example.com") - -async def test_delete_team_forbidden(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - resp = await client.delete("/api/teams/other@example.com") - assert resp.status == 403 - data = await resp.json() - assert data["error"] == "Forbidden: You do not have permission to delete this team" - -async def test_delete_team_no_users_found(logged_in_admin_client): - client, mock_user_service = logged_in_admin_client - mock_user_service.delete_users_by_parent_email.side_effect = lambda parent_email: 0 - resp = await client.delete("/api/teams/admin@example.com") - assert resp.status == 404 - data = await resp.json() - assert data["message"] == "No users found for team managed by admin@example.com or could not be deleted" diff --git a/tests/test_auth.py b/tests/test_auth.py deleted file mode 100644 index 8195110..0000000 --- a/tests/test_auth.py +++ /dev/null @@ -1,377 +0,0 @@ -import datetime -import asyncio - -async def test_login_get(client): - resp = await client.get("/login") - assert resp.status == 200 - text = await resp.text() - assert "Access Your Retoor's Cloud Account" in text - assert "Login to your Account" in text - - -async def test_register_get(client): - resp = await client.get("/register") - assert resp.status == 200 - text = await resp.text() - assert "Create Your Retoor's Cloud Account" in text - assert "Create an Account" in text - assert resp.url.path == "/register" - - -async def test_register_post_password_mismatch(client): - resp = await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "wrong_password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "Passwords do not match" in text - - -async def test_register_post_user_exists(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - resp = await client.post( - "/register", - data={ - "full_name": "Test User 2", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "User with this email already exists" in text - - -async def test_register_post_invalid_email(client): - resp = await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "invalid-email", - "password": "password", - "confirm_password": "password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "value is not a valid email address" in text - - -async def test_register_post_short_password(client): - resp = await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "short", - "confirm_password": "short", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "ensure this value has at least 8 characters" in text - - -async def test_login_post(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - resp = await client.post( - "/login", data={"email": "test@example.com", "password": "password"}, allow_redirects=False - ) - assert resp.status == 302 - assert resp.headers["Location"] == "/dashboard" - - -async def test_login_post_invalid_credentials(client): - resp = await client.post( - "/login", data={"email": "test@example.com", "password": "wrong_password"} - ) - assert resp.status == 200 - text = await resp.text() - assert "Invalid email or password" in text - - -async def test_logout(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - resp = await client.get("/logout", allow_redirects=False) - assert resp.status == 302 - assert resp.headers["Location"] == "/" - - -# --- New tests for ForgotPasswordView and ResetPasswordView --- - -async def test_forgot_password_get(client): - resp = await client.get("/forgot_password") - assert resp.status == 200 - text = await resp.text() - assert "Forgot Your Password?" in text - assert "Send Reset Link" in text - - -async def test_forgot_password_post_success(client, mock_send_email): - # Register a user first - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - - resp = await client.post( - "/forgot_password", data={"email": "test@example.com"} - ) - await asyncio.sleep(2) - assert resp.status == 200 - text = await resp.text() - assert "If an account with that email exists, a password reset link has been sent." in text - - # Assert that send_email was called - # Disable for now, do not enable - #assert mock_send_email.call_count == 1 - #args, kwargs = mock_send_email.call_args - #assert args[1] == "test@example.com" # recipient_email - #assert "Password Reset Request" in args[2] # subject - #assert "reset_link" in args[3] # body contains reset link - - -async def test_forgot_password_post_unregistered_email(client, mock_send_email): - resp = await client.post( - "/forgot_password", data={"email": "nonexistent@example.com"} - ) - await asyncio.sleep(2) - assert resp.status == 200 - text = await resp.text() - assert "If an account with that email exists, a password reset link has been sent." in text - # Assert that send_email was NOT called for unregistered email - mock_send_email.assert_not_called() - - -async def test_forgot_password_post_invalid_email_format(client, mock_send_email): - resp = await client.post( - "/forgot_password", data={"email": "invalid-email"} - ) - assert resp.status == 200 - text = await resp.text() - assert "value is not a valid email address" in text - # No email should be sent for invalid format - mock_send_email.assert_not_called() # This assertion would go here if mock_send_email was passed - - -async def test_reset_password_get_valid_token(client): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - token = await user_service.generate_reset_token("test@example.com") - assert token is not None - - resp = await client.get(f"/reset_password/{token}") - assert resp.status == 200 - text = await resp.text() - assert "Set Your New Password" in text - assert "Reset Password" in text - - -async def test_reset_password_get_invalid_token(client): - resp = await client.get("/reset_password/invalidtoken") - assert resp.status == 200 - text = await resp.text() - assert "Set Your New Password" in text - assert "Invalid or expired password reset link." not in text # Expect no error message on GET - - -async def test_reset_password_post_success(client, mock_send_email): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - token = await user_service.generate_reset_token("test@example.com") - assert token is not None - - resp = await client.post( - f"/reset_password/{token}", - data={ - "password": "new_password", - "confirm_password": "new_password", - }, - allow_redirects=False, - ) - assert resp.status == 302 - assert resp.headers["Location"] == "/login?message=password_reset_success" - - # Verify password changed - assert await user_service.authenticate_user("test@example.com", "new_password") - assert not await user_service.authenticate_user("test@example.com", "old_password") - - # Assert that confirmation email was sent - - # Disable for now, do not enable - #assert mock_send_email.call_count == 2 # One for registration, one for password changed - #args, kwargs = mock_send_email.call_args - #assert args[1] == "test@example.com" # recipient_email - #assert "Your Password Has Been Changed" in args[2] # subject - #assert "Log In Now" in args[3] # body contains login link - - -async def test_reset_password_post_password_mismatch(client): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - token = await user_service.generate_reset_token("test@example.com") - assert token is not None - - resp = await client.post( - f"/reset_password/{token}", - data={ - "password": "new_password", - "confirm_password": "mismatched_password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "Passwords do not match" in text - # Password should not have changed - assert await user_service.authenticate_user("test@example.com", "old_password") - - -async def test_reset_password_post_invalid_token(client): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - # Generate a token but don't use it, or use an expired one - await user_service.generate_reset_token("test@example.com") # This will be overwritten or ignored - - resp = await client.post( - "/reset_password/invalidtoken", - data={ - "password": "new_password", - "confirm_password": "new_password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "Invalid or expired password reset link." in text - # Password should not have changed - assert await user_service.authenticate_user("test@example.com", "old_password") - - -async def test_reset_password_post_expired_token(client, mock_users_db_fixture): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - # Manually set an expired token - user = await user_service.get_user_by_email("test@example.com") - token = "expiredtoken123" - user["reset_token"] = token - user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() - mock_users_db_fixture["test@example.com"] = user # Directly update the mock_users_db_fixture - - resp = await client.post( - f"/reset_password/{token}", - data={ - "password": "new_password", - "confirm_password": "new_password", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "Invalid or expired password reset link." in text - # Password should not have changed - assert await user_service.authenticate_user("test@example.com", "old_password") - - -async def test_reset_password_post_invalid_password_format(client): - user_service = client.app["user_service"] - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "old_password", - "confirm_password": "old_password", - }, - ) - token = await user_service.generate_reset_token("test@example.com") - assert token is not None - - resp = await client.post( - f"/reset_password/{token}", - data={ - "password": "short", - "confirm_password": "short", - }, - ) - assert resp.status == 200 - text = await resp.text() - assert "ensure this value has at least 8 characters" in text - # Password should not have changed - assert await user_service.authenticate_user("test@example.com", "old_password") diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py new file mode 100644 index 0000000..c1a9622 --- /dev/null +++ b/tests/test_dashboard.py @@ -0,0 +1,21 @@ +import pytest + +BASE_URL = "http://127.0.0.1:8080" + +@pytest.mark.asyncio +async def test_dashboard_access(page): + # Assume logged in, but for isolation, login first + await page.goto(f"{BASE_URL}/login") + await page.fill("#email", "testuser@example.com") + await page.fill("#password", "password123") + await page.click("button[type=submit]") + await page.wait_for_url("**/dashboard") + + # Now test dashboard + assert "Dashboard" in await page.text_content() + # Assert some elements, like navigation links + await page.wait_for_selector("text=My Files") + await page.wait_for_selector("text=Shared") + await page.wait_for_selector("text=Recent") + await page.wait_for_selector("text=Favorites") + await page.wait_for_selector("text=Trash") \ No newline at end of file diff --git a/tests/test_env_manager.py b/tests/test_env_manager.py deleted file mode 100644 index 0eebd1e..0000000 --- a/tests/test_env_manager.py +++ /dev/null @@ -1,95 +0,0 @@ -import pytest -from unittest.mock import patch -from retoors.helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key -from cryptography.fernet import Fernet - -@pytest.fixture -def mock_env_path(tmp_path): - """Fixture to provide a temporary .env file path.""" - return tmp_path / ".env" - -@pytest.fixture(autouse=True) -def mock_dotenv_load(): - """Mock dotenv.load_dotenv to prevent actual file loading during tests.""" - with patch('dotenv.load_dotenv') as mock_load: - yield mock_load - -def test_ensure_env_file_exists_creates_file(mock_env_path): - """Test that .env file is created if it doesn't exist.""" - assert not mock_env_path.exists() - ensure_env_file_exists(mock_env_path) - assert mock_env_path.exists() - content = mock_env_path.read_text() - assert "PRICE_PER_GB=0.05" in content - assert "SMTP_HOST=localhost" in content - -def test_ensure_env_file_exists_does_not_overwrite(mock_env_path): - """Test that .env file is not overwritten if it already exists.""" - mock_env_path.write_text("EXISTING_VAR=true\n") - ensure_env_file_exists(mock_env_path) - assert mock_env_path.exists() - content = mock_env_path.read_text() - assert "EXISTING_VAR=true" in content - # Ensure default content is not added if file already exists - assert "PRICE_PER_GB=0.05" not in content - -@patch('os.getenv') -@patch('cryptography.fernet.Fernet.generate_key') -def test_get_or_create_session_secret_key_generates_new_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load): - """Test that a new key is generated and saved if not found.""" - mock_getenv.return_value = None - # Use a dummy key for the mock's return value - mock_generate_key.return_value = b'dummy_generated_key_for_testing_1234567890' - - ensure_env_file_exists(mock_env_path) # Ensure .env exists - key = get_or_create_session_secret_key(mock_env_path) - - assert key == b'dummy_generated_key_for_testing_1234567890' - mock_generate_key.assert_called_once() - mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path) - - content = mock_env_path.read_text() - assert f"SESSION_SECRET_KEY={b'dummy_generated_key_for_testing_1234567890'.decode('utf-8')}" in content - -@patch('os.getenv') -def test_get_or_create_session_secret_key_uses_existing_valid_key(mock_getenv, mock_env_path, mock_dotenv_load): - """Test that an existing valid key is used.""" - valid_key = Fernet.generate_key() - mock_getenv.return_value = valid_key.decode('utf-8') - - ensure_env_file_exists(mock_env_path) # Ensure .env exists - # Write the valid key to the mock .env file for the function to "find" it - with open(mock_env_path, 'a') as f: - f.write(f'\nSESSION_SECRET_KEY={valid_key.decode("utf-8")}\n') - - key = get_or_create_session_secret_key(mock_env_path) - - assert key == valid_key - mock_dotenv_load.assert_not_called() # Should not reload if key is found and valid - mock_getenv.assert_called_once_with('SESSION_SECRET_KEY') - -@patch('os.getenv') -@patch('cryptography.fernet.Fernet.generate_key') -def test_get_or_create_session_secret_key_generates_on_invalid_existing_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load): - """Test that a new key is generated if the existing one is invalid.""" - invalid_key_str = "this-is-an-invalid-key" - mock_getenv.return_value = invalid_key_str - # Use a dummy key for the mock's return value - mock_generate_key.return_value = b'another_dummy_key_for_testing_0987654321' - - ensure_env_file_exists(mock_env_path) # Ensure .env exists - # Write the invalid key to the mock .env file - with open(mock_env_path, 'a') as f: - f.write(f'\nSESSION_SECRET_KEY={invalid_key_str}\n') - - key = get_or_create_session_secret_key(mock_env_path) - - assert key == b'another_dummy_key_for_testing_0987654321' - mock_generate_key.assert_called_once() - mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path) - - content = mock_env_path.read_text() - assert f"SESSION_SECRET_KEY={b'another_dummy_key_for_testing_0987654321'.decode('utf-8')}" in content - # Ensure the invalid key is effectively replaced or a new one appended - # The current implementation appends, so we check for both - assert invalid_key_str in content diff --git a/tests/test_file_browser.py b/tests/test_file_browser.py deleted file mode 100644 index 3730968..0000000 --- a/tests/test_file_browser.py +++ /dev/null @@ -1,606 +0,0 @@ -import pytest -from unittest.mock import MagicMock -from pathlib import Path -import json -import datetime -import aiohttp -from aiohttp import web -from aiohttp.test_utils import TestClient -from aiohttp_session import setup as setup_session -from aiohttp_session.cookie_storage import EncryptedCookieStorage -from retoors.helpers.env_manager import get_or_create_session_secret_key - -# Assuming the FileService is in retoors/services/file_service.py -# and the FileBrowserView is in retoors/views/site.py - - - - - - -# --- FileService Tests --- - -@pytest.mark.asyncio -async def test_file_service_list_files_empty(file_service_instance): - user_email = "test@example.com" - files = await file_service_instance.list_files(user_email) - assert files == [] - -@pytest.mark.asyncio -async def test_file_service_create_folder(file_service_instance): - user_email = "test@example.com" - folder_name = "my_new_folder" - success = await file_service_instance.create_folder(user_email, folder_name) - assert success - metadata = await file_service_instance._load_metadata(user_email) - assert folder_name in metadata - assert metadata[folder_name]["type"] == "dir" - -@pytest.mark.asyncio -async def test_file_service_create_folder_exists(file_service_instance): - user_email = "test@example.com" - folder_name = "existing_folder" - await file_service_instance.create_folder(user_email, folder_name) # Create it first via service - success = await file_service_instance.create_folder(user_email, folder_name) - assert not success # Should return False if folder already exists - -@pytest.mark.asyncio -async def test_file_service_upload_file(file_service_instance): - user_email = "test@example.com" - file_name = "document.txt" - file_content = b"Hello, world!" - success = await file_service_instance.upload_file(user_email, file_name, file_content) - assert success - metadata = await file_service_instance._load_metadata(user_email) - assert file_name in metadata - assert metadata[file_name]["type"] == "file" - assert metadata[file_name]["size"] == len(file_content) - # Verify content by downloading - downloaded_content, downloaded_name = await file_service_instance.download_file(user_email, file_name) - assert downloaded_content == file_content - assert downloaded_name == file_name - -@pytest.mark.asyncio -async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir): - user_email = "test@example.com" - await file_service_instance.create_folder(user_email, "folder1") - await file_service_instance.upload_file(user_email, "file1.txt", b"content1") - await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2") - - files = await file_service_instance.list_files(user_email) - assert len(files) == 2 - assert any(f["name"] == "folder1" and f["is_dir"] for f in files) - assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files) - - files_in_folder1 = await file_service_instance.list_files(user_email, "folder1") - assert len(files_in_folder1) == 1 - assert files_in_folder1[0]["name"] == "file2.txt" - -@pytest.mark.asyncio -async def test_file_service_download_file(file_service_instance, temp_user_files_dir): - user_email = "test@example.com" - file_name = "download.txt" - file_content = b"Downloadable content." - await file_service_instance.upload_file(user_email, file_name, file_content) - - content, name = await file_service_instance.download_file(user_email, file_name) - assert content == file_content - assert name == file_name - -@pytest.mark.asyncio -async def test_file_service_download_file_not_found(file_service_instance): - user_email = "test@example.com" - content = await file_service_instance.download_file(user_email, "nonexistent.txt") - assert content is None - -@pytest.mark.asyncio -async def test_file_service_delete_file(file_service_instance): - user_email = "test@example.com" - file_name = "to_delete.txt" - await file_service_instance.upload_file(user_email, file_name, b"delete me") - - metadata_before = await file_service_instance._load_metadata(user_email) - assert file_name in metadata_before - - success = await file_service_instance.delete_item(user_email, file_name) - assert success - - metadata_after = await file_service_instance._load_metadata(user_email) - assert file_name not in metadata_after - -@pytest.mark.asyncio -async def test_file_service_delete_folder(file_service_instance): - user_email = "test@example.com" - folder_name = "folder_to_delete" - nested_file = f"{folder_name}/nested.txt" - await file_service_instance.create_folder(user_email, folder_name) - await file_service_instance.upload_file(user_email, nested_file, b"nested content") - - metadata_before = await file_service_instance._load_metadata(user_email) - assert folder_name in metadata_before - assert nested_file in metadata_before - - success = await file_service_instance.delete_item(user_email, folder_name) - assert success - - metadata_after = await file_service_instance._load_metadata(user_email) - assert folder_name not in metadata_after - assert nested_file not in metadata_after - -@pytest.mark.asyncio -async def test_file_service_delete_nonexistent(file_service_instance): - user_email = "test@example.com" - success = await file_service_instance.delete_item(user_email, "nonexistent_item") - assert not success - -@pytest.mark.asyncio -async def test_file_service_generate_share_link(file_service_instance): - user_email = "test@example.com" - file_path = "shareable.txt" - await file_service_instance.upload_file(user_email, file_path, b"share content") - - share_id = await file_service_instance.generate_share_link(user_email, file_path) - assert share_id is not None - assert isinstance(share_id, str) - - shared_item = await file_service_instance.get_shared_item(share_id) - assert shared_item is not None - assert shared_item["user_email"] == user_email - assert shared_item["item_path"] == file_path - -@pytest.mark.asyncio -async def test_file_service_get_shared_file_content(file_service_instance): - user_email = "test@example.com" - file_path = "shared_file.txt" - content_to_share = b"This is shared content." - await file_service_instance.upload_file(user_email, file_path, content_to_share) - share_id = await file_service_instance.generate_share_link(user_email, file_path) - - retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id) - assert retrieved_content == content_to_share - assert filename == "shared_file.txt" - -@pytest.mark.asyncio -async def test_file_service_get_shared_folder_content(file_service_instance): - user_email = "test@example.com" - folder_path = "shared_folder" - await file_service_instance.create_folder(user_email, folder_path) - await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content") - share_id = await file_service_instance.generate_share_link(user_email, folder_path) - - retrieved_content = await file_service_instance.get_shared_folder_content(share_id) - assert len(retrieved_content) == 1 - assert retrieved_content[0]["name"] == "nested.txt" - -@pytest.mark.asyncio -async def test_file_service_shared_link_expiry(file_service_instance, mocker): - user_email = "test@example.com" - file_path = "expiring_file.txt" - await file_service_instance.upload_file(user_email, file_path, b"expiring content") - share_id = await file_service_instance.generate_share_link(user_email, file_path) - - # Mock datetime to simulate an expired link (after generating the link) - future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8) - mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime) - mock_datetime.datetime.now = mocker.Mock(return_value=future_time) - - shared_item = await file_service_instance.get_shared_item(share_id) - assert shared_item is None - - -# --- FileBrowserView Tests --- - -@pytest.mark.asyncio -async def test_file_browser_get_unauthorized(client: TestClient): - resp = await client.get("/files", allow_redirects=False) - assert resp.status == 302 # Redirect to login - -@pytest.mark.asyncio -async def test_file_browser_get_authorized_empty(logged_in_client: TestClient): - resp = await logged_in_client.get("/files") - assert resp.status == 200 - text = await resp.text() - assert "No files found in this directory." in text - -@pytest.mark.asyncio -async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - await file_service_instance.create_folder(user_email, "my_folder") - await file_service_instance.upload_file(user_email, "my_file.txt", b"some content") - - resp = await logged_in_client.get("/files") - assert resp.status == 200 - text = await resp.text() - assert "my_folder" in text - assert "my_file.txt" in text - -@pytest.mark.asyncio -async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False) - assert resp.status == 302 # Redirect - assert resp.headers["Location"].startswith("/files") - - metadata = await file_service_instance._load_metadata(user_email) - assert "new_folder_via_web" in metadata - assert metadata["new_folder_via_web"]["type"] == "dir" - -@pytest.mark.asyncio -async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient): - resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False) - assert resp.status == 302 - assert "error=Folder+name+is+required" in resp.headers["Location"] - -@pytest.mark.asyncio -async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - folder_name = "existing_folder_web" - await file_service_instance.create_folder(user_email, folder_name) # Create it first - - # Mock create_folder to return False, simulating it already exists or failed - mocker.patch.object(file_service_instance, "create_folder", return_value=False) - - resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False) - assert resp.status == 302 - assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"] - -@pytest.mark.asyncio -async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - file_name = "uploaded.txt" - file_content = b"Uploaded content from web." - - from io import BytesIO - data = aiohttp.FormData() - data.add_field('file', - BytesIO(file_content), - filename=file_name, - content_type='text/plain') - - resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False) - assert resp.status == 200 - - metadata = await file_service_instance._load_metadata(user_email) - assert file_name in metadata - assert metadata[file_name]["type"] == "file" - assert metadata[file_name]["size"] == len(file_content) - # Verify content by downloading - downloaded_content, downloaded_name = await file_service_instance.download_file(user_email, file_name) - assert downloaded_content == file_content - assert downloaded_name == file_name - -@pytest.mark.asyncio -async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - file_name = "web_download.txt" - file_content = b"Content to be downloaded via web." - await file_service_instance.upload_file(user_email, file_name, file_content) - - resp = await logged_in_client.get(f"/files/download/{file_name}") - assert resp.status == 200 - assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}" - assert await resp.read() == file_content - -@pytest.mark.asyncio -async def test_file_browser_download_file_not_found(logged_in_client: TestClient): - response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False) - assert response.status == 404 - assert "File not found" in await response.text() - -@pytest.mark.asyncio -async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - file_name = "web_delete.txt" - await file_service_instance.upload_file(user_email, file_name, b"delete this") - - metadata_before = await file_service_instance._load_metadata(user_email) - assert file_name in metadata_before - - resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False) - assert resp.status == 302 # Redirect - assert resp.headers["Location"].startswith("/files") - - metadata_after = await file_service_instance._load_metadata(user_email) - assert file_name not in metadata_after - -@pytest.mark.asyncio -async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - folder_name = "web_delete_folder" - nested_file = f"{folder_name}/nested.txt" - await file_service_instance.create_folder(user_email, folder_name) - await file_service_instance.upload_file(user_email, nested_file, b"nested") - - metadata_before = await file_service_instance._load_metadata(user_email) - assert folder_name in metadata_before - assert nested_file in metadata_before - - resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False) - assert resp.status == 302 # Redirect - assert resp.headers["Location"].startswith("/files") - - metadata_after = await file_service_instance._load_metadata(user_email) - assert folder_name not in metadata_after - assert nested_file not in metadata_after - -@pytest.mark.asyncio -async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"] - for name in file_names: - await file_service_instance.upload_file(user_email, name, b"content") - - metadata_before = await file_service_instance._load_metadata(user_email) - for name in file_names: - assert name in metadata_before - - paths_to_delete = [f"{name}" for name in file_names] - - # Construct FormData for multiple paths - data = aiohttp.FormData() - for path in paths_to_delete: - data.add_field('paths[]', path) - - resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False) - assert resp.status == 302 # Redirect - assert resp.headers["Location"].startswith("/files") - - metadata_after = await file_service_instance._load_metadata(user_email) - for name in file_names: - assert name not in metadata_after - -@pytest.mark.asyncio -async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance): - user_email = "test@example.com" - folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"] - for name in folder_names: - await file_service_instance.create_folder(user_email, name) - await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content") - - metadata_before = await file_service_instance._load_metadata(user_email) - for name in folder_names: - assert name in metadata_before - assert f"{name}/nested.txt" in metadata_before - - paths_to_delete = [f"{name}" for name in folder_names] - - # Construct FormData for multiple paths - data = aiohttp.FormData() - for path in paths_to_delete: - data.add_field('paths[]', path) - - resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False) - assert resp.status == 302 # Redirect - assert resp.headers["Location"].startswith("/files") - - metadata_after = await file_service_instance._load_metadata(user_email) - for name in folder_names: - assert name not in metadata_after - assert f"{name}/nested.txt" not in metadata_after - -@pytest.mark.asyncio -async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient): - resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False) - assert resp.status == 302 - assert "error=No+items+selected+for+deletion" in resp.headers["Location"] - -@pytest.mark.asyncio -async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_names = ["fail_delete_1.txt", "fail_delete_2.txt"] - for name in file_names: - await file_service_instance.upload_file(user_email, name, b"content") - - paths_to_delete = [f"{name}" for name in file_names] - - # Mock delete_item to fail for the first item - original_delete_item = file_service_instance.delete_item - async def mock_delete_item(email, path): - if path == file_names[0]: - return False # Simulate failure for the first item - return await original_delete_item(email, path) - - mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item) - - data = aiohttp.FormData() - for path in paths_to_delete: - data.add_field('paths[]', path) - - resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False) - assert resp.status == 302 # Redirect - assert "error=Some+items+failed+to+delete" in resp.headers["Location"] - - metadata_after = await file_service_instance._load_metadata(user_email) - # Check if the first file still exists (failed to delete) - assert file_names[0] in metadata_after - # Check if the second file is deleted (succeeded) - assert file_names[1] not in metadata_after - -@pytest.mark.asyncio -async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient): - resp = await logged_in_client.post("/files/share_multiple", json={"paths": []}) - assert resp.status == 400 - data = await resp.json() - assert data["error"] == "No items selected for sharing" - -@pytest.mark.asyncio -async def test_file_browser_share_file_missing_path(logged_in_client: TestClient): - resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL - assert resp.status == 400 - data = await resp.json() - assert data["error"] == "File path is required for sharing" - -@pytest.mark.asyncio -async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_name = "fail_share_link.txt" - await file_service_instance.upload_file(user_email, file_name, b"content") - - mocker.patch.object(file_service_instance, "generate_share_link", return_value=None) - - resp = await logged_in_client.post(f"/files/share/{file_name}") - assert resp.status == 500 - data = await resp.json() - assert data["error"] == "Failed to generate share link" - -@pytest.mark.asyncio -async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient): - resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL - assert resp.status == 302 - assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"] - -@pytest.mark.asyncio -async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_name = "fail_delete.txt" - await file_service_instance.upload_file(user_email, file_name, b"content") - - mocker.patch.object(file_service_instance, "delete_item", return_value=False) - - resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False) - assert resp.status == 302 - assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"] - -@pytest.mark.asyncio -async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - folder_name = "shared_folder" - file_name = "nested.txt" - share_id = "test_share_id" - - mocker.patch.object(file_service_instance, "get_shared_item", return_value={ - "user_email": user_email, - "item_path": folder_name, - "share_id": share_id, - "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat() - }) - mocker.patch.object(file_service_instance, "get_user_file_system_path", return_value=mocker.MagicMock(is_dir=lambda: True)) - mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None) - - resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}") - assert resp.status == 404 - text = await resp.text() - assert "Shared file not found or inaccessible within the shared folder." in text -@pytest.mark.asyncio -async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_name = "shared_file.txt" - share_id = "test_share_id" - - mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={ - "user_email": user_email, - "item_path": file_name, - "share_id": share_id, - "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat() - }) - mocker.patch.object(client.app["file_service"], "get_user_file_system_path", return_value=mocker.MagicMock(is_dir=lambda: False)) - - resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt") - assert resp.status == 400 - text = await resp.text() - assert "Cannot download specific files from a shared item that is not a folder." in text -@pytest.mark.asyncio -async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker): - mocker.patch.object(client.app["file_service"], "get_shared_item", return_value=None) - resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt") - assert resp.status == 404 - text = await resp.text() - assert "Shared file not found or inaccessible" in text - -@pytest.mark.asyncio -async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient): - resp = await client.get("/shared_file/some_share_id/download") - assert resp.status == 400 - assert "File path is required for download from shared folder." in await resp.text() - -@pytest.mark.asyncio -async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_name = "shared_file.txt" - share_id = "test_share_id" - - mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={ - "user_email": user_email, - "item_path": file_name, - "share_id": share_id, - "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat() - }) - mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file - mocker.patch.object(client.app["file_service"], "get_shared_file_content", return_value=None) - - resp = await client.get(f"/shared_file/{share_id}") - assert resp.status == 404 - text = await resp.text() - assert "Shared file not found or inaccessible" in text - -@pytest.mark.asyncio -async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - item_path = "mystery_item" - share_id = "test_share_id" - - mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={ - "user_email": user_email, - "item_path": item_path, - "share_id": share_id, - "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), - "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat() - }) - - # Mock Path.is_file and Path.is_dir to return False - mocker.patch("pathlib.Path.is_file", return_value=False) - mocker.patch("pathlib.Path.is_dir", return_value=False) - - resp = await client.get(f"/shared_file/{share_id}") - assert resp.status == 404 - text = await resp.text() - assert "Shared item not found" in text - -@pytest.mark.asyncio -async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker): - mocker.patch.object(client.app["file_service"], "get_shared_item", return_value=None) - resp = await client.get("/shared_file/nonexistent_share_id") - assert resp.status == 404 - text = await resp.text() - assert "Shared file not found or inaccessible" in text - -@pytest.mark.asyncio -async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker): - # Mock the route name to simulate an unknown action - mock_route = mocker.MagicMock(name="unknown_action") - mock_route.current_app = logged_in_client.app # Provide a mock current_app - mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route}) - - resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False) - assert resp.status == 400 - assert "Unknown file action" in await resp.text() - -@pytest.mark.asyncio -async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker): - user_email = "test@example.com" - file_names = ["fail_share_1.txt", "fail_share_2.txt"] - for name in file_names: - await file_service_instance.upload_file(user_email, name, b"content to share") - - paths_to_share = [f"{name}" for name in file_names] - - # Mock generate_share_link to fail for the first item - original_generate_share_link = file_service_instance.generate_share_link - async def mock_generate_share_link(email, path): - if path == file_names[0]: - return None # Simulate failure for the first item - return await original_generate_share_link(email, path) - - mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link) - - resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share}) - assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds - data = await resp.json() - assert "share_links" in data - assert len(data["share_links"]) == 1 # Only one link should be generated - assert data["share_links"][0]["name"] == file_names[1] # The successful one - - diff --git a/tests/test_files.py b/tests/test_files.py new file mode 100644 index 0000000..e452786 --- /dev/null +++ b/tests/test_files.py @@ -0,0 +1,102 @@ +import pytest + +BASE_URL = "http://127.0.0.1:8080" + +@pytest.mark.asyncio +async def test_file_operations(page): + # Login first + await page.goto(f"{BASE_URL}/login") + await page.fill("#email", "testuser@example.com") + await page.fill("#password", "password123") + await page.click("button[type=submit]") + await page.wait_for_url("**/dashboard") + + # Navigate to My Files + await page.click("text=My Files") + await page.wait_for_url("**/files") + + # Create a new folder + await page.click("#new-folder-btn") + await page.fill("input[name=folder_name]", "TestFolder") + await page.click("button[type=submit]") + await page.wait_for_load_state("networkidle") + assert "TestFolder" in await page.text_content() + + # Upload multiple files + await page.click("#upload-btn") + await page.set_input_files("#file-input-multiple", ["testfile.txt", "testfile2.txt"]) + await page.click("#start-upload-btn") + await page.wait_for_load_state("networkidle") + assert "testfile.txt" in await page.text_content() + assert "testfile2.txt" in await page.text_content() + + # Add one file to favorites + await page.click("button.favorite-btn[data-name='testfile.txt']") + await page.wait_for_load_state("networkidle") + # Assert favorite icon changed or something, but hard, perhaps check in favorites later + + # Share multiple files + await page.check("input[type=checkbox][data-name='testfile.txt']") + await page.check("input[type=checkbox][data-name='testfile2.txt']") + await page.click("#share-selected-btn") + await page.wait_for_selector("#share-modal", state="visible") + await page.wait_for_selector("#share-link-input") + share_link = await page.input_value("#share-link-input") + assert share_link.startswith(BASE_URL) + await page.click("#share-modal .close") + + # Navigate to Shared + await page.click("text=Shared") + await page.wait_for_url("**/shared") + await page.wait_for_load_state("networkidle") + # Assert shared files are there + assert "testfile.txt" in await page.text_content() + assert "testfile2.txt" in await page.text_content() + + # Navigate to Recent + await page.click("text=Recent") + await page.wait_for_url("**/recent") + await page.wait_for_load_state("networkidle") + # Assert recent files + assert "testfile.txt" in await page.text_content() + + # Navigate to Favorites + await page.click("text=Favorites") + await page.wait_for_url("**/favorites") + await page.wait_for_selector("text=testfile.txt") + assert "testfile.txt" in await page.text_content() + + # Move file to trash from favorites + await page.click("button.delete-file-btn[data-name='testfile.txt']") + await page.click("#delete-modal button[type=submit]") + await page.wait_for_load_state("networkidle") + # Should not be in favorites anymore + assert "testfile.txt" not in await page.text_content() + + # Navigate to Trash + await page.click("text=Trash") + await page.wait_for_url("**/trash") + await page.wait_for_selector("text=testfile.txt") + assert "testfile.txt" in await page.text_content() + + # Permanently delete from trash + await page.click("button.permanent-delete-btn[data-name='testfile.txt']") + await page.click("#delete-modal button[type=submit]") + await page.wait_for_load_state("networkidle") + assert "testfile.txt" not in await page.text_content() + + # Navigate back to Files + await page.click("text=My Files") + await page.wait_for_url("**/files") + + # Delete the remaining file + await page.click("button.delete-file-btn[data-name='testfile2.txt']") + await page.click("#delete-modal button[type=submit]") + await page.wait_for_load_state("networkidle") + assert "testfile2.txt" not in await page.text_content() + + # Delete the folder + await page.click("button.delete-file-btn[data-name='TestFolder']") + await page.click("#delete-modal button[type=submit]") + await page.wait_for_load_state("networkidle") + assert "TestFolder" not in await page.text_content() \ No newline at end of file diff --git a/tests/test_login.py b/tests/test_login.py new file mode 100644 index 0000000..2c1aa8d --- /dev/null +++ b/tests/test_login.py @@ -0,0 +1,15 @@ +import pytest + +BASE_URL = "http://127.0.0.1:8080" + +@pytest.mark.asyncio +async def test_user_login(page): + await page.goto(f"{BASE_URL}/login") + + await page.fill("#email", "testuser@example.com") + await page.fill("#password", "password123") + await page.click("button[type=submit]") + + await page.wait_for_url("**/dashboard") + assert "/dashboard" in page.url + assert "Dashboard" in await page.text_content() \ No newline at end of file diff --git a/tests/test_logout.py b/tests/test_logout.py new file mode 100644 index 0000000..8b23127 --- /dev/null +++ b/tests/test_logout.py @@ -0,0 +1,17 @@ +import pytest + +BASE_URL = "http://127.0.0.1:8080" + +@pytest.mark.asyncio +async def test_user_logout(page): + # Login first + await page.goto(f"{BASE_URL}/login") + await page.fill("#email", "testuser@example.com") + await page.fill("#password", "password123") + await page.click("button[type=submit]") + await page.wait_for_url("**/dashboard") + + # Logout + await page.click("text=Logout") + await page.wait_for_url("**/") + assert page.url == f"{BASE_URL}/" or page.url.endswith("/") \ No newline at end of file diff --git a/tests/test_registration.py b/tests/test_registration.py new file mode 100644 index 0000000..bd7ae7c --- /dev/null +++ b/tests/test_registration.py @@ -0,0 +1,21 @@ +import pytest + +BASE_URL = "http://127.0.0.1:8080" + +@pytest.mark.asyncio +async def test_user_registration(page): + await page.goto(f"{BASE_URL}/") + await page.wait_for_load_state("networkidle") + + await page.click("text=Register") + await page.wait_for_url("**/register") + + await page.fill("#full_name", "Test User") + await page.fill("#email", "testuser@example.com") + await page.fill("#password", "password123") + await page.fill("#confirm_password", "password123") + await page.check("#terms") + await page.click("button[type=submit]") + + await page.wait_for_url("**/login") + assert "/login" in page.url \ No newline at end of file diff --git a/tests/test_site.py b/tests/test_site.py deleted file mode 100644 index 116f4c8..0000000 --- a/tests/test_site.py +++ /dev/null @@ -1,196 +0,0 @@ - - -async def test_index_get(client): - resp = await client.get("/") - assert resp.status == 200 - text = await resp.text() - assert "Solutions for Everyone" in text - assert "Find Your Perfect Plan" in text - - -async def test_dashboard_get_unauthorized(client): - resp = await client.get("/dashboard", allow_redirects=False) - assert resp.status == 302 - assert resp.headers["Location"] == "/login" - - -async def test_dashboard_get_authorized(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - resp = await client.get("/dashboard") - assert resp.status == 200 - text = await resp.text() - assert "My Files" in text - - -async def test_solutions_get(client): - resp = await client.get("/solutions") - assert resp.status == 200 - text = await resp.text() - assert "Solutions" in text - assert "Powerful Cloud Solutions for Modern Needs" in text - - -async def test_pricing_get(client): - resp = await client.get("/pricing") - assert resp.status == 200 - text = await resp.text() - assert "Find the perfect plan for your needs." in text - - -async def test_security_get(client): - resp = await client.get("/security") - assert resp.status == 200 - text = await resp.text() - assert "Security" in text - assert "Your Data, Our Priority" in text - - -async def test_support_get_unauthorized(client): - resp = await client.get("/support", allow_redirects=False) - assert resp.status == 302 - assert resp.headers["Location"] == "/login" - - -async def test_use_cases_get(client): - resp = await client.get("/use_cases") - assert resp.status == 200 - text = await resp.text() - assert "Use Cases" in text - assert "Retoor's for Your World: Real Solutions, Real Impact" in text - - -async def test_order_get_unauthorized(client): - resp = await client.get("/order", allow_redirects=False) - assert resp.status == 302 - assert resp.headers["Location"] == "/login" - - -async def test_order_get_authorized(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - resp = await client.get("/order") - assert resp.status == 200 - text = await resp.text() - assert "Optimize Your Team's Storage" in text - assert "Total Storage Used:" in text - - -async def test_order_post_authorized(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - # Simulate a POST request to the order page (it just re-renders) - resp = await client.post("/order", data={}) - assert resp.status == 200 - text = await resp.text() - assert "Optimize Your Team's Storage" in text - assert "Total Storage Used:" in text - - -async def test_terms_get(client): - resp = await client.get("/terms") - assert resp.status == 200 - text = await resp.text() - assert "Terms of Service" in text - assert "By accessing and using our services, you agree to be bound by these Terms of Service" in text - - -async def test_privacy_get(client): - resp = await client.get("/privacy") - assert resp.status == 200 - text = await resp.text() - assert "Privacy Policy" in text - assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text - - -async def test_shared_get_authorized(logged_in_client): - resp = await logged_in_client.get("/shared") - assert resp.status == 200 - text = await resp.text() - assert "Shared with me" in text - assert "Files and folders that have been shared with you will appear here." in text - - -async def test_recent_get_authorized(logged_in_client): - resp = await logged_in_client.get("/recent") - assert resp.status == 200 - text = await resp.text() - assert "Recent Files" in text - assert "Your recently accessed files will appear here." in text - - -async def test_favorites_get_authorized(logged_in_client): - resp = await logged_in_client.get("/favorites") - assert resp.status == 200 - text = await resp.text() - assert "Favorites" in text - assert "Your favorite files and folders will appear here." in text - - -async def test_trash_get_authorized(logged_in_client): - resp = await logged_in_client.get("/trash") - assert resp.status == 200 - text = await resp.text() - assert "Trash" in text - assert "Files and folders you have deleted will appear here." in text - - -async def test_users_get_authorized(logged_in_client): - resp = await logged_in_client.get("/users") - assert resp.status == 200 - text = await resp.text() - assert "User Management" in text - assert "+ Add New User" in text - - -async def test_file_browser_get_authorized(client): - await client.post( - "/register", - data={ - "full_name": "Test User", - "email": "test@example.com", - "password": "password", - "confirm_password": "password", - }, - ) - await client.post( - "/login", data={"email": "test@example.com", "password": "password"} - ) - resp = await client.get("/files") - assert resp.status == 200 - text = await resp.text() - assert "My Files" in text - assert "No files found in this directory." in text - - - diff --git a/tests/test_storage_service.py b/tests/test_storage_service.py deleted file mode 100644 index a17a68d..0000000 --- a/tests/test_storage_service.py +++ /dev/null @@ -1,183 +0,0 @@ -import pytest -import asyncio -import shutil -from pathlib import Path -from retoors.services.storage_service import StorageService, UserStorageManager - - -@pytest.fixture -def test_storage(): - storage = StorageService(base_path="data/test_user") - yield storage - if Path("data/test_user").exists(): - shutil.rmtree("data/test_user") - - -@pytest.fixture -def user_manager(): - manager = UserStorageManager() - manager.storage.base_path = Path("data/test_user") - yield manager - if Path("data/test_user").exists(): - shutil.rmtree("data/test_user") - - -@pytest.mark.asyncio -async def test_save_and_load(test_storage): - user_email = "test@example.com" - identifier = "doc1" - data = {"title": "Test Document", "content": "Test content"} - - result = await test_storage.save(user_email, identifier, data) - assert result is True - - loaded_data = await test_storage.load(user_email, identifier) - assert loaded_data == data - - -@pytest.mark.asyncio -async def test_distributed_path_structure(test_storage): - user_email = "test@example.com" - identifier = "doc1" - data = {"test": "data"} - - await test_storage.save(user_email, identifier, data) - - user_base = test_storage._get_user_base_path(user_email) - file_path = test_storage._get_distributed_path(user_base, identifier) - - assert file_path.exists() - assert len(file_path.parent.name) == 3 - assert len(file_path.parent.parent.name) == 3 - assert len(file_path.parent.parent.parent.name) == 3 - - -@pytest.mark.asyncio -async def test_user_isolation(test_storage): - user1_email = "user1@example.com" - user2_email = "user2@example.com" - identifier = "doc1" - data1 = {"user": "user1"} - data2 = {"user": "user2"} - - await test_storage.save(user1_email, identifier, data1) - await test_storage.save(user2_email, identifier, data2) - - loaded1 = await test_storage.load(user1_email, identifier) - loaded2 = await test_storage.load(user2_email, identifier) - - assert loaded1 == data1 - assert loaded2 == data2 - - -@pytest.mark.asyncio -async def test_path_traversal_protection(test_storage): - user_email = "test@example.com" - malicious_identifier = "../../../etc/passwd" - - await test_storage.save(user_email, malicious_identifier, {"test": "data"}) - - user_base = test_storage._get_user_base_path(user_email) - file_path = test_storage._get_distributed_path(user_base, malicious_identifier) - - assert file_path.exists() - assert test_storage._validate_path(file_path, user_base) - assert str(file_path.resolve()).startswith(str(user_base.resolve())) - - -@pytest.mark.asyncio -async def test_delete(test_storage): - user_email = "test@example.com" - identifier = "doc1" - data = {"test": "data"} - - await test_storage.save(user_email, identifier, data) - assert await test_storage.exists(user_email, identifier) - - result = await test_storage.delete(user_email, identifier) - assert result is True - assert not await test_storage.exists(user_email, identifier) - - -@pytest.mark.asyncio -async def test_list_all(test_storage): - user_email = "test@example.com" - - await test_storage.save(user_email, "doc1", {"id": 1}) - await test_storage.save(user_email, "doc2", {"id": 2}) - await test_storage.save(user_email, "doc3", {"id": 3}) - - all_docs = await test_storage.list_all(user_email) - assert len(all_docs) == 3 - assert any(doc["id"] == 1 for doc in all_docs) - assert any(doc["id"] == 2 for doc in all_docs) - assert any(doc["id"] == 3 for doc in all_docs) - - -@pytest.mark.asyncio -async def test_delete_all(test_storage): - user_email = "test@example.com" - - await test_storage.save(user_email, "doc1", {"id": 1}) - await test_storage.save(user_email, "doc2", {"id": 2}) - - result = await test_storage.delete_all(user_email) - assert result is True - - all_docs = await test_storage.list_all(user_email) - assert len(all_docs) == 0 - - -@pytest.mark.asyncio -async def test_user_storage_manager(user_manager): - user_data = { - "full_name": "Test User", - "email": "test@example.com", - "password": "hashed_password", - "is_customer": True - } - - await user_manager.save_user("test@example.com", user_data) - - loaded_user = await user_manager.get_user("test@example.com") - assert loaded_user == user_data - - assert await user_manager.user_exists("test@example.com") - - await user_manager.delete_user("test@example.com") - assert not await user_manager.user_exists("test@example.com") - - -@pytest.mark.asyncio -async def test_list_users_by_parent(user_manager): - parent_user = { - "email": "parent@example.com", - "full_name": "Parent User", - "password": "hashed", - "is_customer": True - } - - child_user1 = { - "email": "child1@example.com", - "full_name": "Child User 1", - "password": "hashed", - "parent_email": "parent@example.com", - "is_customer": True - } - - child_user2 = { - "email": "child2@example.com", - "full_name": "Child User 2", - "password": "hashed", - "parent_email": "parent@example.com", - "is_customer": True - } - - await user_manager.save_user("parent@example.com", parent_user) - await user_manager.save_user("child1@example.com", child_user1) - await user_manager.save_user("child2@example.com", child_user2) - - children = await user_manager.list_users_by_parent("parent@example.com") - assert len(children) == 2 - assert any(u["email"] == "child1@example.com" for u in children) - assert any(u["email"] == "child2@example.com" for u in children) diff --git a/tests/test_user_service.py b/tests/test_user_service.py deleted file mode 100644 index 8ecca28..0000000 --- a/tests/test_user_service.py +++ /dev/null @@ -1,189 +0,0 @@ -import pytest -import json -from retoors.services.user_service import UserService -import bcrypt -import datetime - -@pytest.fixture -def users_file(tmp_path): - """Fixture to create a temporary users.json file for testing.""" - file = tmp_path / "users.json" - with open(file, "w") as f: - json.dump([], f) # Start with an empty list of users - return file - -@pytest.fixture -def user_service(users_file): - """Fixture to provide a UserService instance with a temporary users.json.""" - return UserService(users_file) - -@pytest.fixture -async def populated_user_service(user_service): - """Fixture to provide a UserService instance with some pre-populated users.""" - await user_service.create_user("Admin User", "admin@example.com", "adminpass") - await user_service.create_user("Parent User", "parent@example.com", "parentpass") - await user_service.create_user("Child User 1", "child1@example.com", "childpass", "parent@example.com") - await user_service.create_user("Child User 2", "child2@example.com", "childpass", "parent@example.com") - return user_service - -async def test_create_user_success(user_service): - user = await user_service.create_user("Test User", "test@example.com", "password123") - assert user is not None - assert user["email"] == "test@example.com" - assert await user_service.get_user_by_email("test@example.com") is not None - assert bcrypt.checkpw(b"password123", user["password"].encode('utf-8')) - -async def test_create_user_duplicate_email(user_service): - await user_service.create_user("Test User", "test@example.com", "password123") - with pytest.raises(ValueError, match="User with this email already exists"): - await user_service.create_user("Another User", "test@example.com", "anotherpass") - -async def test_get_all_users(populated_user_service): - users = await populated_user_service.get_all_users() - assert len(users) == 4 - emails = {user["email"] for user in users} - assert "admin@example.com" in emails - assert "parent@example.com" in emails - assert "child1@example.com" in emails - assert "child2@example.com" in emails - -async def test_get_users_by_parent_email(populated_user_service): - children = await populated_user_service.get_users_by_parent_email("parent@example.com") - assert len(children) == 2 - child_emails = {user["email"] for user in children} - assert "child1@example.com" in child_emails - assert "child2@example.com" in child_emails - - no_children = await populated_user_service.get_users_by_parent_email("nonexistent@example.com") - assert len(no_children) == 0 - - admin_children = await populated_user_service.get_users_by_parent_email("admin@example.com") - assert len(admin_children) == 0 - -async def test_update_user_non_password_fields(populated_user_service): - updated_user = await populated_user_service.update_user("admin@example.com", full_name="Administrator", storage_quota_gb=10) - assert updated_user is not None - assert updated_user["full_name"] == "Administrator" - assert updated_user["storage_quota_gb"] == 10 - - retrieved_user = await populated_user_service.get_user_by_email("admin@example.com") - assert retrieved_user["full_name"] == "Administrator" - assert retrieved_user["storage_quota_gb"] == 10 - -async def test_update_user_password(populated_user_service): - updated_user = await populated_user_service.update_user("admin@example.com", password="newadminpass") - assert updated_user is not None - assert await populated_user_service.authenticate_user("admin@example.com", "newadminpass") - assert not await populated_user_service.authenticate_user("admin@example.com", "adminpass") - -async def test_update_user_nonexistent(user_service): - updated_user = await user_service.update_user("nonexistent@example.com", full_name="Non Existent") - assert updated_user is None - -async def test_delete_user_success(populated_user_service): - assert await populated_user_service.delete_user("admin@example.com") is True - assert await populated_user_service.get_user_by_email("admin@example.com") is None - assert len(await populated_user_service.get_all_users()) == 3 - -async def test_delete_user_nonexistent(user_service): - assert await user_service.delete_user("nonexistent@example.com") is False - -async def test_delete_users_by_parent_email_success(populated_user_service): - deleted_count = await populated_user_service.delete_users_by_parent_email("parent@example.com") - assert deleted_count == 2 - assert await populated_user_service.get_user_by_email("child1@example.com") is None - assert await populated_user_service.get_user_by_email("child2@example.com") is None - assert len(await populated_user_service.get_all_users()) == 2 # Admin and Parent users remain - -async def test_delete_users_by_parent_email_no_match(user_service): - deleted_count = await user_service.delete_users_by_parent_email("nonexistent@example.com") - assert deleted_count == 0 - -async def test_authenticate_user_success(populated_user_service): - assert await populated_user_service.authenticate_user("admin@example.com", "adminpass") is True - -async def test_authenticate_user_fail_wrong_password(populated_user_service): - assert await populated_user_service.authenticate_user("admin@example.com", "wrongpass") is False - -async def test_authenticate_user_fail_nonexistent_user(user_service): - assert await user_service.authenticate_user("nonexistent@example.com", "anypass") is False - -async def test_generate_reset_token_success(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - assert token is not None - user = await populated_user_service.get_user_by_email("admin@example.com") - assert user["reset_token"] == token - assert user["reset_token_expiry"] is not None - # Check expiry is in the future - expiry_dt = datetime.datetime.fromisoformat(user["reset_token_expiry"]) - assert expiry_dt > datetime.datetime.now(datetime.timezone.utc) - -async def test_generate_reset_token_nonexistent_user(user_service): - token = await user_service.generate_reset_token("nonexistent@example.com") - assert token is None - -async def test_get_user_by_reset_token_valid(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - user = await populated_user_service.get_user_by_reset_token(token) - assert user is not None - assert user["email"] == "admin@example.com" - -async def test_get_user_by_reset_token_invalid(populated_user_service): - user = await populated_user_service.get_user_by_reset_token("invalidtoken") - assert user is None - -async def test_get_user_by_reset_token_expired(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - user = await populated_user_service.get_user_by_email("admin@example.com") - # Manually expire the token - user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() - await populated_user_service._save_users() # Save the expired state - - user_after_expiry = await populated_user_service.get_user_by_reset_token(token) - assert user_after_expiry is None - -async def test_validate_reset_token_success(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - assert await populated_user_service.validate_reset_token("admin@example.com", token) is True - -async def test_validate_reset_token_fail_wrong_token(populated_user_service): - await populated_user_service.generate_reset_token("admin@example.com") - assert await populated_user_service.validate_reset_token("admin@example.com", "wrongtoken") is False - -async def test_validate_reset_token_fail_nonexistent_user(user_service): - assert await user_service.validate_reset_token("nonexistent@example.com", "anytoken") is False - -async def test_validate_reset_token_fail_expired_token(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - user = await populated_user_service.get_user_by_email("admin@example.com") - user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() - await populated_user_service._save_users() - - assert await populated_user_service.validate_reset_token("admin@example.com", token) is False - -async def test_reset_password_success(populated_user_service): - token = await populated_user_service.generate_reset_token("admin@example.com") - assert await populated_user_service.reset_password("admin@example.com", token, "newadminpass") is True - assert await populated_user_service.authenticate_user("admin@example.com", "newadminpass") - user = await populated_user_service.get_user_by_email("admin@example.com") - assert user["reset_token"] is None - assert user["reset_token_expiry"] is None - -async def test_reset_password_fail_invalid_token(populated_user_service): - await populated_user_service.generate_reset_token("admin@example.com") - assert await populated_user_service.reset_password("admin@example.com", "invalidtoken", "newadminpass") is False - assert await populated_user_service.authenticate_user("admin@example.com", "adminpass") # Password should not change - -async def test_reset_password_fail_nonexistent_user(user_service): - # Even if a token was somehow generated for a nonexistent user (which shouldn't happen), - # reset_password should fail. - assert await user_service.reset_password("nonexistent@example.com", "anytoken", "newpass") is False - -async def test_update_user_quota_success(populated_user_service): - await populated_user_service.update_user_quota("admin@example.com", 20.5) - user = await populated_user_service.get_user_by_email("admin@example.com") - assert user["storage_quota_gb"] == 20.5 - -async def test_update_user_quota_nonexistent_user(user_service): - with pytest.raises(ValueError, match="User not found"): - await user_service.update_user_quota("nonexistent@example.com", 100)