This commit is contained in:
retoor 2025-11-09 10:02:10 +01:00
parent 7abced9315
commit 925f91a17c
13 changed files with 177 additions and 2173 deletions

View File

@ -1,194 +1 @@
import pytest
from pathlib import Path
import json
from retoors.main import create_app
from retoors.services.config_service import ConfigService
from pytest_mock import MockerFixture # Import MockerFixture
import datetime # Import datetime
from aiohttp.test_utils import TestClient # Import TestClient
from aiohttp_session import setup as setup_session # Import setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage # Import EncryptedCookieStorage
from retoors.helpers.env_manager import get_or_create_session_secret_key # Import get_or_create_session_secret_key
from retoors.middlewares import user_middleware, error_middleware # Import middlewares
from retoors.services.user_service import UserService # Import UserService
from retoors.routes import setup_routes # Import setup_routes
import aiohttp_jinja2 # Import aiohttp_jinja2
import jinja2 # Import jinja2
import aiohttp # Import aiohttp
from retoors.services.file_service import FileService # Import FileService
@pytest.fixture
def temp_user_files_dir(tmp_path):
"""Fixture to create a temporary directory for user files."""
user_files_dir = tmp_path / "user_files"
user_files_dir.mkdir()
return user_files_dir
@pytest.fixture
def temp_users_json(tmp_path):
"""Fixture to create a temporary users.json file."""
users_json_path = tmp_path / "users.json"
initial_users_data = [
{
"email": "test@example.com",
"full_name": "Test User",
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": None,
"shared_items": {}
},
{
"email": "child@example.com",
"full_name": "Child User",
"email": "child@example.com",
"password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": "test@example.com",
"shared_items": {}
}
]
with open(users_json_path, "w") as f:
json.dump(initial_users_data, f)
return users_json_path
@pytest.fixture
def file_service_instance(temp_user_files_dir, temp_users_json):
"""Fixture to provide a FileService instance with temporary directories."""
user_service = UserService(temp_users_json) # Create a UserService instance
return FileService(temp_user_files_dir, user_service) # Pass the UserService instance
@pytest.fixture
def create_test_app(mocker, temp_user_files_dir, temp_users_json):
"""Fixture to create a test aiohttp application with mocked services."""
from aiohttp import web
app = web.Application()
# Setup session for the test app
project_root = Path(__file__).parent.parent
env_file_path = project_root / ".env"
secret_key = get_or_create_session_secret_key(env_file_path)
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
app.middlewares.append(error_middleware)
app.middlewares.append(user_middleware)
# Use a real UserService with a temporary users.json for the app
app["user_service"] = UserService(temp_users_json)
# Mock scheduler
mock_scheduler = mocker.MagicMock()
mock_scheduler.spawn = mocker.AsyncMock()
mock_scheduler.close = mocker.AsyncMock()
app["file_service"] = FileService(temp_user_files_dir, app["user_service"])
app["scheduler"] = mock_scheduler
# Setup Jinja2 for templates
base_path = Path(__file__).parent.parent / "retoors"
templates_path = base_path / "templates"
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
setup_routes(app)
return app
@pytest.fixture
async def client(
aiohttp_client, mocker: MockerFixture, create_test_app
):
app = create_test_app # Use create_test_app for consistent test environment
# Directly set app["scheduler"] to a mock object
mock_scheduler_instance = mocker.MagicMock()
mock_scheduler_instance.spawn = mocker.AsyncMock()
mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable
app["scheduler"] = mock_scheduler_instance
# The UserService and ConfigService are now set up in create_test_app with temporary files.
# No need to manually create users.json or config.json here.
client = await aiohttp_client(app)
# The UserService is now a real instance, so we don't need to mock its methods here.
# The mock_users_db_fixture is also no longer needed in this context.
try:
yield client
finally:
# Clean up temporary files created by create_test_app if necessary,
# but tmp_path usually handles this.
pass
@pytest.fixture
async def logged_in_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
# The UserService is now a real instance, so we don't need to mock its methods here.
# The create_user, authenticate_user, and get_user_by_email methods will interact
# with the real UserService instance.
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
return client
@pytest.fixture
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in admin user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
# The UserService is now a real instance, so we don't need to mock its methods here.
# The create_user, authenticate_user, and get_user_by_email methods will interact
# with the real UserService instance.
await client.post(
"/register",
data={
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "admin@example.com", "password": "password"}
)
return client
@pytest.fixture
def mock_send_email(mocker: MockerFixture):
"""
Fixture to mock the send_email function.
This fixture will return the mock that was patched globally by the client fixture.
"""
# Access the globally patched mock
return mocker.patch("retoors.helpers.email_sender.send_email")
# conftest.py for pytest-playwright

View File

@ -1,333 +0,0 @@
import pytest
from unittest.mock import call
@pytest.fixture
async def admin_client(client):
"""Fixture to provide an aiohttp client with mocked services."""
return client
@pytest.fixture
async def logged_in_admin_client(admin_client):
# Get the mocked user_service from the app
mock_user_service = admin_client.app["user_service"]
resp = await admin_client.post(
"/login", data={"email": "admin@example.com", "password": "password"}, allow_redirects=False
)
if resp.status != 302:
print(f"Login failed with status {resp.status}. Response text: {await resp.text()}")
assert resp.status == 302 # Expecting a redirect after successful login
# Assert that authenticate_user was called on the mock
mock_user_service.authenticate_user.assert_called_once_with("admin@example.com", "password")
# The client will not follow the redirect, so the session cookie will be set on the client
return admin_client, mock_user_service # Return both client and mock_user_service
async def test_get_users_unauthorized(admin_client):
resp = await admin_client.get("/api/users")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_get_users_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.get("/api/users")
assert resp.status == 200
data = await resp.json()
assert "users" in data
assert len(data["users"]) == 2
assert data["users"][0]["email"] == "admin@example.com"
assert data["users"][1]["email"] == "child1@example.com"
mock_user_service.get_all_users.assert_called_once()
async def test_add_user_unauthorized(admin_client):
resp = await admin_client.post("/api/users", json={
"full_name": "New User",
"email": "new@example.com",
"password": "password123"
})
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_add_user_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.post("/api/users", json={
"full_name": "New User",
"email": "new@example.com",
"password": "password123"
})
assert resp.status == 201
data = await resp.json()
assert data["message"] == "User added successfully"
assert data["user"]["email"] == "new@example.com"
mock_user_service.create_user.assert_called_once_with(
full_name="New User",
email="new@example.com",
password="password123",
parent_email="admin@example.com"
)
async def test_add_user_invalid_data(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.post("/api/users", json={
"full_name": "Nu", # Too short
"email": "invalid-email",
"password": "123" # Too short
})
assert resp.status == 400
data = await resp.json()
assert "3 validation errors for RegistrationModel" in data["error"]
async def test_add_user_email_exists(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.create_user.side_effect = ValueError("User with this email already exists")
resp = await client.post("/api/users", json={
"full_name": "Existing User",
"email": "admin@example.com",
"password": "password123"
})
assert resp.status == 400
data = await resp.json()
assert data["error"] == "User with this email already exists"
async def test_update_user_quota_unauthorized(admin_client):
resp = await admin_client.put("/api/users/child1@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_update_user_quota_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.put("/api/users/child1@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Quota for child1@example.com updated successfully"
mock_user_service.update_user_quota.assert_called_once_with("child1@example.com", 200)
async def test_update_user_quota_self(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.put("/api/users/admin@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Quota for admin@example.com updated successfully"
mock_user_service.update_user_quota.assert_called_once_with("admin@example.com", 200)
async def test_update_user_quota_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child and not self
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.put("/api/users/other@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to update this user's quota"
async def test_update_user_quota_user_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
resp = await client.put("/api/users/nonexistent@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to update this user's quota"
async def test_delete_user_unauthorized(admin_client):
resp = await admin_client.delete("/api/users/child1@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_delete_user_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/users/child1@example.com")
assert resp.status == 200
data = await resp.json()
assert data["message"] == "User child1@example.com deleted successfully"
mock_user_service.delete_user.assert_called_once_with("child1@example.com")
async def test_delete_user_self_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/users/admin@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You cannot delete your own account from this interface"
async def test_delete_user_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.delete("/api/users/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this user"
async def test_delete_user_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
mock_user_service.delete_user.return_value = False
resp = await client.delete("/api/users/nonexistent@example.com")
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this user"
async def test_get_user_details_unauthorized(admin_client):
resp = await admin_client.get("/api/users/child1@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_get_user_details_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.reset_mock() # Reset mock call count
resp = await client.get("/api/users/child1@example.com")
assert resp.status == 200
data = await resp.json()
assert "user" in data
assert data["user"]["email"] == "child1@example.com"
assert "password" not in data["user"] # Ensure sensitive data is not returned
mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("child1@example.com")])
async def test_get_user_details_self(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.reset_mock() # Reset mock call count
resp = await client.get("/api/users/admin@example.com")
assert resp.status == 200
data = await resp.json()
assert "user" in data
assert data["user"]["email"] == "admin@example.com"
mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("admin@example.com")])
async def test_get_user_details_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child and not self
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.get("/api/users/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to view this user's details"
async def test_get_user_details_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
resp = await client.get("/api/users/nonexistent@example.com")
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to view this user's details"
async def test_delete_team_unauthorized(admin_client):
resp = await admin_client.delete("/api/teams/admin@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_delete_team_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/teams/admin@example.com")
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Successfully deleted 1 users from the team managed by admin@example.com"
mock_user_service.delete_users_by_parent_email.assert_called_once_with("admin@example.com")
async def test_delete_team_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/teams/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this team"
async def test_delete_team_no_users_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.delete_users_by_parent_email.side_effect = lambda parent_email: 0
resp = await client.delete("/api/teams/admin@example.com")
assert resp.status == 404
data = await resp.json()
assert data["message"] == "No users found for team managed by admin@example.com or could not be deleted"

View File

@ -1,377 +0,0 @@
import datetime
import asyncio
async def test_login_get(client):
resp = await client.get("/login")
assert resp.status == 200
text = await resp.text()
assert "Access Your Retoor's Cloud Account" in text
assert "Login to your Account" in text
async def test_register_get(client):
resp = await client.get("/register")
assert resp.status == 200
text = await resp.text()
assert "Create Your Retoor's Cloud Account" in text
assert "Create an Account" in text
assert resp.url.path == "/register"
async def test_register_post_password_mismatch(client):
resp = await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "wrong_password",
},
)
assert resp.status == 200
text = await resp.text()
assert "Passwords do not match" in text
async def test_register_post_user_exists(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
resp = await client.post(
"/register",
data={
"full_name": "Test User 2",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
assert resp.status == 200
text = await resp.text()
assert "User with this email already exists" in text
async def test_register_post_invalid_email(client):
resp = await client.post(
"/register",
data={
"full_name": "Test User",
"email": "invalid-email",
"password": "password",
"confirm_password": "password",
},
)
assert resp.status == 200
text = await resp.text()
assert "value is not a valid email address" in text
async def test_register_post_short_password(client):
resp = await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "short",
"confirm_password": "short",
},
)
assert resp.status == 200
text = await resp.text()
assert "ensure this value has at least 8 characters" in text
async def test_login_post(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
resp = await client.post(
"/login", data={"email": "test@example.com", "password": "password"}, allow_redirects=False
)
assert resp.status == 302
assert resp.headers["Location"] == "/dashboard"
async def test_login_post_invalid_credentials(client):
resp = await client.post(
"/login", data={"email": "test@example.com", "password": "wrong_password"}
)
assert resp.status == 200
text = await resp.text()
assert "Invalid email or password" in text
async def test_logout(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
resp = await client.get("/logout", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/"
# --- New tests for ForgotPasswordView and ResetPasswordView ---
async def test_forgot_password_get(client):
resp = await client.get("/forgot_password")
assert resp.status == 200
text = await resp.text()
assert "Forgot Your Password?" in text
assert "Send Reset Link" in text
async def test_forgot_password_post_success(client, mock_send_email):
# Register a user first
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
resp = await client.post(
"/forgot_password", data={"email": "test@example.com"}
)
await asyncio.sleep(2)
assert resp.status == 200
text = await resp.text()
assert "If an account with that email exists, a password reset link has been sent." in text
# Assert that send_email was called
# Disable for now, do not enable
#assert mock_send_email.call_count == 1
#args, kwargs = mock_send_email.call_args
#assert args[1] == "test@example.com" # recipient_email
#assert "Password Reset Request" in args[2] # subject
#assert "reset_link" in args[3] # body contains reset link
async def test_forgot_password_post_unregistered_email(client, mock_send_email):
resp = await client.post(
"/forgot_password", data={"email": "nonexistent@example.com"}
)
await asyncio.sleep(2)
assert resp.status == 200
text = await resp.text()
assert "If an account with that email exists, a password reset link has been sent." in text
# Assert that send_email was NOT called for unregistered email
mock_send_email.assert_not_called()
async def test_forgot_password_post_invalid_email_format(client, mock_send_email):
resp = await client.post(
"/forgot_password", data={"email": "invalid-email"}
)
assert resp.status == 200
text = await resp.text()
assert "value is not a valid email address" in text
# No email should be sent for invalid format
mock_send_email.assert_not_called() # This assertion would go here if mock_send_email was passed
async def test_reset_password_get_valid_token(client):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
token = await user_service.generate_reset_token("test@example.com")
assert token is not None
resp = await client.get(f"/reset_password/{token}")
assert resp.status == 200
text = await resp.text()
assert "Set Your New Password" in text
assert "Reset Password" in text
async def test_reset_password_get_invalid_token(client):
resp = await client.get("/reset_password/invalidtoken")
assert resp.status == 200
text = await resp.text()
assert "Set Your New Password" in text
assert "Invalid or expired password reset link." not in text # Expect no error message on GET
async def test_reset_password_post_success(client, mock_send_email):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
token = await user_service.generate_reset_token("test@example.com")
assert token is not None
resp = await client.post(
f"/reset_password/{token}",
data={
"password": "new_password",
"confirm_password": "new_password",
},
allow_redirects=False,
)
assert resp.status == 302
assert resp.headers["Location"] == "/login?message=password_reset_success"
# Verify password changed
assert await user_service.authenticate_user("test@example.com", "new_password")
assert not await user_service.authenticate_user("test@example.com", "old_password")
# Assert that confirmation email was sent
# Disable for now, do not enable
#assert mock_send_email.call_count == 2 # One for registration, one for password changed
#args, kwargs = mock_send_email.call_args
#assert args[1] == "test@example.com" # recipient_email
#assert "Your Password Has Been Changed" in args[2] # subject
#assert "Log In Now" in args[3] # body contains login link
async def test_reset_password_post_password_mismatch(client):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
token = await user_service.generate_reset_token("test@example.com")
assert token is not None
resp = await client.post(
f"/reset_password/{token}",
data={
"password": "new_password",
"confirm_password": "mismatched_password",
},
)
assert resp.status == 200
text = await resp.text()
assert "Passwords do not match" in text
# Password should not have changed
assert await user_service.authenticate_user("test@example.com", "old_password")
async def test_reset_password_post_invalid_token(client):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
# Generate a token but don't use it, or use an expired one
await user_service.generate_reset_token("test@example.com") # This will be overwritten or ignored
resp = await client.post(
"/reset_password/invalidtoken",
data={
"password": "new_password",
"confirm_password": "new_password",
},
)
assert resp.status == 200
text = await resp.text()
assert "Invalid or expired password reset link." in text
# Password should not have changed
assert await user_service.authenticate_user("test@example.com", "old_password")
async def test_reset_password_post_expired_token(client, mock_users_db_fixture):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
# Manually set an expired token
user = await user_service.get_user_by_email("test@example.com")
token = "expiredtoken123"
user["reset_token"] = token
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
mock_users_db_fixture["test@example.com"] = user # Directly update the mock_users_db_fixture
resp = await client.post(
f"/reset_password/{token}",
data={
"password": "new_password",
"confirm_password": "new_password",
},
)
assert resp.status == 200
text = await resp.text()
assert "Invalid or expired password reset link." in text
# Password should not have changed
assert await user_service.authenticate_user("test@example.com", "old_password")
async def test_reset_password_post_invalid_password_format(client):
user_service = client.app["user_service"]
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "old_password",
"confirm_password": "old_password",
},
)
token = await user_service.generate_reset_token("test@example.com")
assert token is not None
resp = await client.post(
f"/reset_password/{token}",
data={
"password": "short",
"confirm_password": "short",
},
)
assert resp.status == 200
text = await resp.text()
assert "ensure this value has at least 8 characters" in text
# Password should not have changed
assert await user_service.authenticate_user("test@example.com", "old_password")

21
tests/test_dashboard.py Normal file
View File

@ -0,0 +1,21 @@
import pytest
BASE_URL = "http://127.0.0.1:8080"
@pytest.mark.asyncio
async def test_dashboard_access(page):
# Assume logged in, but for isolation, login first
await page.goto(f"{BASE_URL}/login")
await page.fill("#email", "testuser@example.com")
await page.fill("#password", "password123")
await page.click("button[type=submit]")
await page.wait_for_url("**/dashboard")
# Now test dashboard
assert "Dashboard" in await page.text_content()
# Assert some elements, like navigation links
await page.wait_for_selector("text=My Files")
await page.wait_for_selector("text=Shared")
await page.wait_for_selector("text=Recent")
await page.wait_for_selector("text=Favorites")
await page.wait_for_selector("text=Trash")

View File

@ -1,95 +0,0 @@
import pytest
from unittest.mock import patch
from retoors.helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key
from cryptography.fernet import Fernet
@pytest.fixture
def mock_env_path(tmp_path):
"""Fixture to provide a temporary .env file path."""
return tmp_path / ".env"
@pytest.fixture(autouse=True)
def mock_dotenv_load():
"""Mock dotenv.load_dotenv to prevent actual file loading during tests."""
with patch('dotenv.load_dotenv') as mock_load:
yield mock_load
def test_ensure_env_file_exists_creates_file(mock_env_path):
"""Test that .env file is created if it doesn't exist."""
assert not mock_env_path.exists()
ensure_env_file_exists(mock_env_path)
assert mock_env_path.exists()
content = mock_env_path.read_text()
assert "PRICE_PER_GB=0.05" in content
assert "SMTP_HOST=localhost" in content
def test_ensure_env_file_exists_does_not_overwrite(mock_env_path):
"""Test that .env file is not overwritten if it already exists."""
mock_env_path.write_text("EXISTING_VAR=true\n")
ensure_env_file_exists(mock_env_path)
assert mock_env_path.exists()
content = mock_env_path.read_text()
assert "EXISTING_VAR=true" in content
# Ensure default content is not added if file already exists
assert "PRICE_PER_GB=0.05" not in content
@patch('os.getenv')
@patch('cryptography.fernet.Fernet.generate_key')
def test_get_or_create_session_secret_key_generates_new_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that a new key is generated and saved if not found."""
mock_getenv.return_value = None
# Use a dummy key for the mock's return value
mock_generate_key.return_value = b'dummy_generated_key_for_testing_1234567890'
ensure_env_file_exists(mock_env_path) # Ensure .env exists
key = get_or_create_session_secret_key(mock_env_path)
assert key == b'dummy_generated_key_for_testing_1234567890'
mock_generate_key.assert_called_once()
mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path)
content = mock_env_path.read_text()
assert f"SESSION_SECRET_KEY={b'dummy_generated_key_for_testing_1234567890'.decode('utf-8')}" in content
@patch('os.getenv')
def test_get_or_create_session_secret_key_uses_existing_valid_key(mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that an existing valid key is used."""
valid_key = Fernet.generate_key()
mock_getenv.return_value = valid_key.decode('utf-8')
ensure_env_file_exists(mock_env_path) # Ensure .env exists
# Write the valid key to the mock .env file for the function to "find" it
with open(mock_env_path, 'a') as f:
f.write(f'\nSESSION_SECRET_KEY={valid_key.decode("utf-8")}\n')
key = get_or_create_session_secret_key(mock_env_path)
assert key == valid_key
mock_dotenv_load.assert_not_called() # Should not reload if key is found and valid
mock_getenv.assert_called_once_with('SESSION_SECRET_KEY')
@patch('os.getenv')
@patch('cryptography.fernet.Fernet.generate_key')
def test_get_or_create_session_secret_key_generates_on_invalid_existing_key(mock_generate_key, mock_getenv, mock_env_path, mock_dotenv_load):
"""Test that a new key is generated if the existing one is invalid."""
invalid_key_str = "this-is-an-invalid-key"
mock_getenv.return_value = invalid_key_str
# Use a dummy key for the mock's return value
mock_generate_key.return_value = b'another_dummy_key_for_testing_0987654321'
ensure_env_file_exists(mock_env_path) # Ensure .env exists
# Write the invalid key to the mock .env file
with open(mock_env_path, 'a') as f:
f.write(f'\nSESSION_SECRET_KEY={invalid_key_str}\n')
key = get_or_create_session_secret_key(mock_env_path)
assert key == b'another_dummy_key_for_testing_0987654321'
mock_generate_key.assert_called_once()
mock_dotenv_load.assert_called_once_with(dotenv_path=mock_env_path)
content = mock_env_path.read_text()
assert f"SESSION_SECRET_KEY={b'another_dummy_key_for_testing_0987654321'.decode('utf-8')}" in content
# Ensure the invalid key is effectively replaced or a new one appended
# The current implementation appends, so we check for both
assert invalid_key_str in content

View File

@ -1,606 +0,0 @@
import pytest
from unittest.mock import MagicMock
from pathlib import Path
import json
import datetime
import aiohttp
from aiohttp import web
from aiohttp.test_utils import TestClient
from aiohttp_session import setup as setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from retoors.helpers.env_manager import get_or_create_session_secret_key
# Assuming the FileService is in retoors/services/file_service.py
# and the FileBrowserView is in retoors/views/site.py
# --- FileService Tests ---
@pytest.mark.asyncio
async def test_file_service_list_files_empty(file_service_instance):
user_email = "test@example.com"
files = await file_service_instance.list_files(user_email)
assert files == []
@pytest.mark.asyncio
async def test_file_service_create_folder(file_service_instance):
user_email = "test@example.com"
folder_name = "my_new_folder"
success = await file_service_instance.create_folder(user_email, folder_name)
assert success
metadata = await file_service_instance._load_metadata(user_email)
assert folder_name in metadata
assert metadata[folder_name]["type"] == "dir"
@pytest.mark.asyncio
async def test_file_service_create_folder_exists(file_service_instance):
user_email = "test@example.com"
folder_name = "existing_folder"
await file_service_instance.create_folder(user_email, folder_name) # Create it first via service
success = await file_service_instance.create_folder(user_email, folder_name)
assert not success # Should return False if folder already exists
@pytest.mark.asyncio
async def test_file_service_upload_file(file_service_instance):
user_email = "test@example.com"
file_name = "document.txt"
file_content = b"Hello, world!"
success = await file_service_instance.upload_file(user_email, file_name, file_content)
assert success
metadata = await file_service_instance._load_metadata(user_email)
assert file_name in metadata
assert metadata[file_name]["type"] == "file"
assert metadata[file_name]["size"] == len(file_content)
# Verify content by downloading
downloaded_content, downloaded_name = await file_service_instance.download_file(user_email, file_name)
assert downloaded_content == file_content
assert downloaded_name == file_name
@pytest.mark.asyncio
async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
await file_service_instance.create_folder(user_email, "folder1")
await file_service_instance.upload_file(user_email, "file1.txt", b"content1")
await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2")
files = await file_service_instance.list_files(user_email)
assert len(files) == 2
assert any(f["name"] == "folder1" and f["is_dir"] for f in files)
assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files)
files_in_folder1 = await file_service_instance.list_files(user_email, "folder1")
assert len(files_in_folder1) == 1
assert files_in_folder1[0]["name"] == "file2.txt"
@pytest.mark.asyncio
async def test_file_service_download_file(file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
file_name = "download.txt"
file_content = b"Downloadable content."
await file_service_instance.upload_file(user_email, file_name, file_content)
content, name = await file_service_instance.download_file(user_email, file_name)
assert content == file_content
assert name == file_name
@pytest.mark.asyncio
async def test_file_service_download_file_not_found(file_service_instance):
user_email = "test@example.com"
content = await file_service_instance.download_file(user_email, "nonexistent.txt")
assert content is None
@pytest.mark.asyncio
async def test_file_service_delete_file(file_service_instance):
user_email = "test@example.com"
file_name = "to_delete.txt"
await file_service_instance.upload_file(user_email, file_name, b"delete me")
metadata_before = await file_service_instance._load_metadata(user_email)
assert file_name in metadata_before
success = await file_service_instance.delete_item(user_email, file_name)
assert success
metadata_after = await file_service_instance._load_metadata(user_email)
assert file_name not in metadata_after
@pytest.mark.asyncio
async def test_file_service_delete_folder(file_service_instance):
user_email = "test@example.com"
folder_name = "folder_to_delete"
nested_file = f"{folder_name}/nested.txt"
await file_service_instance.create_folder(user_email, folder_name)
await file_service_instance.upload_file(user_email, nested_file, b"nested content")
metadata_before = await file_service_instance._load_metadata(user_email)
assert folder_name in metadata_before
assert nested_file in metadata_before
success = await file_service_instance.delete_item(user_email, folder_name)
assert success
metadata_after = await file_service_instance._load_metadata(user_email)
assert folder_name not in metadata_after
assert nested_file not in metadata_after
@pytest.mark.asyncio
async def test_file_service_delete_nonexistent(file_service_instance):
user_email = "test@example.com"
success = await file_service_instance.delete_item(user_email, "nonexistent_item")
assert not success
@pytest.mark.asyncio
async def test_file_service_generate_share_link(file_service_instance):
user_email = "test@example.com"
file_path = "shareable.txt"
await file_service_instance.upload_file(user_email, file_path, b"share content")
share_id = await file_service_instance.generate_share_link(user_email, file_path)
assert share_id is not None
assert isinstance(share_id, str)
shared_item = await file_service_instance.get_shared_item(share_id)
assert shared_item is not None
assert shared_item["user_email"] == user_email
assert shared_item["item_path"] == file_path
@pytest.mark.asyncio
async def test_file_service_get_shared_file_content(file_service_instance):
user_email = "test@example.com"
file_path = "shared_file.txt"
content_to_share = b"This is shared content."
await file_service_instance.upload_file(user_email, file_path, content_to_share)
share_id = await file_service_instance.generate_share_link(user_email, file_path)
retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id)
assert retrieved_content == content_to_share
assert filename == "shared_file.txt"
@pytest.mark.asyncio
async def test_file_service_get_shared_folder_content(file_service_instance):
user_email = "test@example.com"
folder_path = "shared_folder"
await file_service_instance.create_folder(user_email, folder_path)
await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content")
share_id = await file_service_instance.generate_share_link(user_email, folder_path)
retrieved_content = await file_service_instance.get_shared_folder_content(share_id)
assert len(retrieved_content) == 1
assert retrieved_content[0]["name"] == "nested.txt"
@pytest.mark.asyncio
async def test_file_service_shared_link_expiry(file_service_instance, mocker):
user_email = "test@example.com"
file_path = "expiring_file.txt"
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
share_id = await file_service_instance.generate_share_link(user_email, file_path)
# Mock datetime to simulate an expired link (after generating the link)
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
shared_item = await file_service_instance.get_shared_item(share_id)
assert shared_item is None
# --- FileBrowserView Tests ---
@pytest.mark.asyncio
async def test_file_browser_get_unauthorized(client: TestClient):
resp = await client.get("/files", allow_redirects=False)
assert resp.status == 302 # Redirect to login
@pytest.mark.asyncio
async def test_file_browser_get_authorized_empty(logged_in_client: TestClient):
resp = await logged_in_client.get("/files")
assert resp.status == 200
text = await resp.text()
assert "No files found in this directory." in text
@pytest.mark.asyncio
async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
await file_service_instance.create_folder(user_email, "my_folder")
await file_service_instance.upload_file(user_email, "my_file.txt", b"some content")
resp = await logged_in_client.get("/files")
assert resp.status == 200
text = await resp.text()
assert "my_folder" in text
assert "my_file.txt" in text
@pytest.mark.asyncio
async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
metadata = await file_service_instance._load_metadata(user_email)
assert "new_folder_via_web" in metadata
assert metadata["new_folder_via_web"]["type"] == "dir"
@pytest.mark.asyncio
async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False)
assert resp.status == 302
assert "error=Folder+name+is+required" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "existing_folder_web"
await file_service_instance.create_folder(user_email, folder_name) # Create it first
# Mock create_folder to return False, simulating it already exists or failed
mocker.patch.object(file_service_instance, "create_folder", return_value=False)
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False)
assert resp.status == 302
assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
file_name = "uploaded.txt"
file_content = b"Uploaded content from web."
from io import BytesIO
data = aiohttp.FormData()
data.add_field('file',
BytesIO(file_content),
filename=file_name,
content_type='text/plain')
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
assert resp.status == 200
metadata = await file_service_instance._load_metadata(user_email)
assert file_name in metadata
assert metadata[file_name]["type"] == "file"
assert metadata[file_name]["size"] == len(file_content)
# Verify content by downloading
downloaded_content, downloaded_name = await file_service_instance.download_file(user_email, file_name)
assert downloaded_content == file_content
assert downloaded_name == file_name
@pytest.mark.asyncio
async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
file_name = "web_download.txt"
file_content = b"Content to be downloaded via web."
await file_service_instance.upload_file(user_email, file_name, file_content)
resp = await logged_in_client.get(f"/files/download/{file_name}")
assert resp.status == 200
assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}"
assert await resp.read() == file_content
@pytest.mark.asyncio
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
assert response.status == 404
assert "File not found" in await response.text()
@pytest.mark.asyncio
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
file_name = "web_delete.txt"
await file_service_instance.upload_file(user_email, file_name, b"delete this")
metadata_before = await file_service_instance._load_metadata(user_email)
assert file_name in metadata_before
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
metadata_after = await file_service_instance._load_metadata(user_email)
assert file_name not in metadata_after
@pytest.mark.asyncio
async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
folder_name = "web_delete_folder"
nested_file = f"{folder_name}/nested.txt"
await file_service_instance.create_folder(user_email, folder_name)
await file_service_instance.upload_file(user_email, nested_file, b"nested")
metadata_before = await file_service_instance._load_metadata(user_email)
assert folder_name in metadata_before
assert nested_file in metadata_before
resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
metadata_after = await file_service_instance._load_metadata(user_email)
assert folder_name not in metadata_after
assert nested_file not in metadata_after
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
metadata_before = await file_service_instance._load_metadata(user_email)
for name in file_names:
assert name in metadata_before
paths_to_delete = [f"{name}" for name in file_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
metadata_after = await file_service_instance._load_metadata(user_email)
for name in file_names:
assert name not in metadata_after
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance):
user_email = "test@example.com"
folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"]
for name in folder_names:
await file_service_instance.create_folder(user_email, name)
await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content")
metadata_before = await file_service_instance._load_metadata(user_email)
for name in folder_names:
assert name in metadata_before
assert f"{name}/nested.txt" in metadata_before
paths_to_delete = [f"{name}" for name in folder_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
metadata_after = await file_service_instance._load_metadata(user_email)
for name in folder_names:
assert name not in metadata_after
assert f"{name}/nested.txt" not in metadata_after
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False)
assert resp.status == 302
assert "error=No+items+selected+for+deletion" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_names = ["fail_delete_1.txt", "fail_delete_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
paths_to_delete = [f"{name}" for name in file_names]
# Mock delete_item to fail for the first item
original_delete_item = file_service_instance.delete_item
async def mock_delete_item(email, path):
if path == file_names[0]:
return False # Simulate failure for the first item
return await original_delete_item(email, path)
mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item)
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert "error=Some+items+failed+to+delete" in resp.headers["Location"]
metadata_after = await file_service_instance._load_metadata(user_email)
# Check if the first file still exists (failed to delete)
assert file_names[0] in metadata_after
# Check if the second file is deleted (succeeded)
assert file_names[1] not in metadata_after
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share_multiple", json={"paths": []})
assert resp.status == 400
data = await resp.json()
assert data["error"] == "No items selected for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_missing_path(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL
assert resp.status == 400
data = await resp.json()
assert data["error"] == "File path is required for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "fail_share_link.txt"
await file_service_instance.upload_file(user_email, file_name, b"content")
mocker.patch.object(file_service_instance, "generate_share_link", return_value=None)
resp = await logged_in_client.post(f"/files/share/{file_name}")
assert resp.status == 500
data = await resp.json()
assert data["error"] == "Failed to generate share link"
@pytest.mark.asyncio
async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL
assert resp.status == 302
assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "fail_delete.txt"
await file_service_instance.upload_file(user_email, file_name, b"content")
mocker.patch.object(file_service_instance, "delete_item", return_value=False)
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
assert resp.status == 302
assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "shared_folder"
file_name = "nested.txt"
share_id = "test_share_id"
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": folder_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(file_service_instance, "get_user_file_system_path", return_value=mocker.MagicMock(is_dir=lambda: True))
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible within the shared folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={
"user_email": user_email,
"item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(client.app["file_service"], "get_user_file_system_path", return_value=mocker.MagicMock(is_dir=lambda: False))
resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt")
assert resp.status == 400
text = await resp.text()
assert "Cannot download specific files from a shared item that is not a folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(client.app["file_service"], "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible" in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient):
resp = await client.get("/shared_file/some_share_id/download")
assert resp.status == 400
assert "File path is required for download from shared folder." in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={
"user_email": user_email,
"item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file
mocker.patch.object(client.app["file_service"], "get_shared_file_content", return_value=None)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
item_path = "mystery_item"
share_id = "test_share_id"
mocker.patch.object(client.app["file_service"], "get_shared_item", return_value={
"user_email": user_email,
"item_path": item_path,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
# Mock Path.is_file and Path.is_dir to return False
mocker.patch("pathlib.Path.is_file", return_value=False)
mocker.patch("pathlib.Path.is_dir", return_value=False)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared item not found" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(client.app["file_service"], "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible" in text
@pytest.mark.asyncio
async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker):
# Mock the route name to simulate an unknown action
mock_route = mocker.MagicMock(name="unknown_action")
mock_route.current_app = logged_in_client.app # Provide a mock current_app
mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route})
resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False)
assert resp.status == 400
assert "Unknown file action" in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_names = ["fail_share_1.txt", "fail_share_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content to share")
paths_to_share = [f"{name}" for name in file_names]
# Mock generate_share_link to fail for the first item
original_generate_share_link = file_service_instance.generate_share_link
async def mock_generate_share_link(email, path):
if path == file_names[0]:
return None # Simulate failure for the first item
return await original_generate_share_link(email, path)
mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link)
resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share})
assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds
data = await resp.json()
assert "share_links" in data
assert len(data["share_links"]) == 1 # Only one link should be generated
assert data["share_links"][0]["name"] == file_names[1] # The successful one

102
tests/test_files.py Normal file
View File

@ -0,0 +1,102 @@
import pytest
BASE_URL = "http://127.0.0.1:8080"
@pytest.mark.asyncio
async def test_file_operations(page):
# Login first
await page.goto(f"{BASE_URL}/login")
await page.fill("#email", "testuser@example.com")
await page.fill("#password", "password123")
await page.click("button[type=submit]")
await page.wait_for_url("**/dashboard")
# Navigate to My Files
await page.click("text=My Files")
await page.wait_for_url("**/files")
# Create a new folder
await page.click("#new-folder-btn")
await page.fill("input[name=folder_name]", "TestFolder")
await page.click("button[type=submit]")
await page.wait_for_load_state("networkidle")
assert "TestFolder" in await page.text_content()
# Upload multiple files
await page.click("#upload-btn")
await page.set_input_files("#file-input-multiple", ["testfile.txt", "testfile2.txt"])
await page.click("#start-upload-btn")
await page.wait_for_load_state("networkidle")
assert "testfile.txt" in await page.text_content()
assert "testfile2.txt" in await page.text_content()
# Add one file to favorites
await page.click("button.favorite-btn[data-name='testfile.txt']")
await page.wait_for_load_state("networkidle")
# Assert favorite icon changed or something, but hard, perhaps check in favorites later
# Share multiple files
await page.check("input[type=checkbox][data-name='testfile.txt']")
await page.check("input[type=checkbox][data-name='testfile2.txt']")
await page.click("#share-selected-btn")
await page.wait_for_selector("#share-modal", state="visible")
await page.wait_for_selector("#share-link-input")
share_link = await page.input_value("#share-link-input")
assert share_link.startswith(BASE_URL)
await page.click("#share-modal .close")
# Navigate to Shared
await page.click("text=Shared")
await page.wait_for_url("**/shared")
await page.wait_for_load_state("networkidle")
# Assert shared files are there
assert "testfile.txt" in await page.text_content()
assert "testfile2.txt" in await page.text_content()
# Navigate to Recent
await page.click("text=Recent")
await page.wait_for_url("**/recent")
await page.wait_for_load_state("networkidle")
# Assert recent files
assert "testfile.txt" in await page.text_content()
# Navigate to Favorites
await page.click("text=Favorites")
await page.wait_for_url("**/favorites")
await page.wait_for_selector("text=testfile.txt")
assert "testfile.txt" in await page.text_content()
# Move file to trash from favorites
await page.click("button.delete-file-btn[data-name='testfile.txt']")
await page.click("#delete-modal button[type=submit]")
await page.wait_for_load_state("networkidle")
# Should not be in favorites anymore
assert "testfile.txt" not in await page.text_content()
# Navigate to Trash
await page.click("text=Trash")
await page.wait_for_url("**/trash")
await page.wait_for_selector("text=testfile.txt")
assert "testfile.txt" in await page.text_content()
# Permanently delete from trash
await page.click("button.permanent-delete-btn[data-name='testfile.txt']")
await page.click("#delete-modal button[type=submit]")
await page.wait_for_load_state("networkidle")
assert "testfile.txt" not in await page.text_content()
# Navigate back to Files
await page.click("text=My Files")
await page.wait_for_url("**/files")
# Delete the remaining file
await page.click("button.delete-file-btn[data-name='testfile2.txt']")
await page.click("#delete-modal button[type=submit]")
await page.wait_for_load_state("networkidle")
assert "testfile2.txt" not in await page.text_content()
# Delete the folder
await page.click("button.delete-file-btn[data-name='TestFolder']")
await page.click("#delete-modal button[type=submit]")
await page.wait_for_load_state("networkidle")
assert "TestFolder" not in await page.text_content()

15
tests/test_login.py Normal file
View File

@ -0,0 +1,15 @@
import pytest
BASE_URL = "http://127.0.0.1:8080"
@pytest.mark.asyncio
async def test_user_login(page):
await page.goto(f"{BASE_URL}/login")
await page.fill("#email", "testuser@example.com")
await page.fill("#password", "password123")
await page.click("button[type=submit]")
await page.wait_for_url("**/dashboard")
assert "/dashboard" in page.url
assert "Dashboard" in await page.text_content()

17
tests/test_logout.py Normal file
View File

@ -0,0 +1,17 @@
import pytest
BASE_URL = "http://127.0.0.1:8080"
@pytest.mark.asyncio
async def test_user_logout(page):
# Login first
await page.goto(f"{BASE_URL}/login")
await page.fill("#email", "testuser@example.com")
await page.fill("#password", "password123")
await page.click("button[type=submit]")
await page.wait_for_url("**/dashboard")
# Logout
await page.click("text=Logout")
await page.wait_for_url("**/")
assert page.url == f"{BASE_URL}/" or page.url.endswith("/")

View File

@ -0,0 +1,21 @@
import pytest
BASE_URL = "http://127.0.0.1:8080"
@pytest.mark.asyncio
async def test_user_registration(page):
await page.goto(f"{BASE_URL}/")
await page.wait_for_load_state("networkidle")
await page.click("text=Register")
await page.wait_for_url("**/register")
await page.fill("#full_name", "Test User")
await page.fill("#email", "testuser@example.com")
await page.fill("#password", "password123")
await page.fill("#confirm_password", "password123")
await page.check("#terms")
await page.click("button[type=submit]")
await page.wait_for_url("**/login")
assert "/login" in page.url

View File

@ -1,196 +0,0 @@
async def test_index_get(client):
resp = await client.get("/")
assert resp.status == 200
text = await resp.text()
assert "Solutions for Everyone" in text
assert "Find Your Perfect Plan" in text
async def test_dashboard_get_unauthorized(client):
resp = await client.get("/dashboard", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_dashboard_get_authorized(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
resp = await client.get("/dashboard")
assert resp.status == 200
text = await resp.text()
assert "My Files" in text
async def test_solutions_get(client):
resp = await client.get("/solutions")
assert resp.status == 200
text = await resp.text()
assert "Solutions" in text
assert "Powerful Cloud Solutions for Modern Needs" in text
async def test_pricing_get(client):
resp = await client.get("/pricing")
assert resp.status == 200
text = await resp.text()
assert "Find the perfect plan for your needs." in text
async def test_security_get(client):
resp = await client.get("/security")
assert resp.status == 200
text = await resp.text()
assert "Security" in text
assert "Your Data, Our Priority" in text
async def test_support_get_unauthorized(client):
resp = await client.get("/support", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_use_cases_get(client):
resp = await client.get("/use_cases")
assert resp.status == 200
text = await resp.text()
assert "Use Cases" in text
assert "Retoor's for Your World: Real Solutions, Real Impact" in text
async def test_order_get_unauthorized(client):
resp = await client.get("/order", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_order_get_authorized(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
resp = await client.get("/order")
assert resp.status == 200
text = await resp.text()
assert "Optimize Your Team's Storage" in text
assert "Total Storage Used:" in text
async def test_order_post_authorized(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
# Simulate a POST request to the order page (it just re-renders)
resp = await client.post("/order", data={})
assert resp.status == 200
text = await resp.text()
assert "Optimize Your Team's Storage" in text
assert "Total Storage Used:" in text
async def test_terms_get(client):
resp = await client.get("/terms")
assert resp.status == 200
text = await resp.text()
assert "Terms of Service" in text
assert "By accessing and using our services, you agree to be bound by these Terms of Service" in text
async def test_privacy_get(client):
resp = await client.get("/privacy")
assert resp.status == 200
text = await resp.text()
assert "Privacy Policy" in text
assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text
async def test_shared_get_authorized(logged_in_client):
resp = await logged_in_client.get("/shared")
assert resp.status == 200
text = await resp.text()
assert "Shared with me" in text
assert "Files and folders that have been shared with you will appear here." in text
async def test_recent_get_authorized(logged_in_client):
resp = await logged_in_client.get("/recent")
assert resp.status == 200
text = await resp.text()
assert "Recent Files" in text
assert "Your recently accessed files will appear here." in text
async def test_favorites_get_authorized(logged_in_client):
resp = await logged_in_client.get("/favorites")
assert resp.status == 200
text = await resp.text()
assert "Favorites" in text
assert "Your favorite files and folders will appear here." in text
async def test_trash_get_authorized(logged_in_client):
resp = await logged_in_client.get("/trash")
assert resp.status == 200
text = await resp.text()
assert "Trash" in text
assert "Files and folders you have deleted will appear here." in text
async def test_users_get_authorized(logged_in_client):
resp = await logged_in_client.get("/users")
assert resp.status == 200
text = await resp.text()
assert "User Management" in text
assert "+ Add New User" in text
async def test_file_browser_get_authorized(client):
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
resp = await client.get("/files")
assert resp.status == 200
text = await resp.text()
assert "My Files" in text
assert "No files found in this directory." in text

View File

@ -1,183 +0,0 @@
import pytest
import asyncio
import shutil
from pathlib import Path
from retoors.services.storage_service import StorageService, UserStorageManager
@pytest.fixture
def test_storage():
storage = StorageService(base_path="data/test_user")
yield storage
if Path("data/test_user").exists():
shutil.rmtree("data/test_user")
@pytest.fixture
def user_manager():
manager = UserStorageManager()
manager.storage.base_path = Path("data/test_user")
yield manager
if Path("data/test_user").exists():
shutil.rmtree("data/test_user")
@pytest.mark.asyncio
async def test_save_and_load(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"title": "Test Document", "content": "Test content"}
result = await test_storage.save(user_email, identifier, data)
assert result is True
loaded_data = await test_storage.load(user_email, identifier)
assert loaded_data == data
@pytest.mark.asyncio
async def test_distributed_path_structure(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"test": "data"}
await test_storage.save(user_email, identifier, data)
user_base = test_storage._get_user_base_path(user_email)
file_path = test_storage._get_distributed_path(user_base, identifier)
assert file_path.exists()
assert len(file_path.parent.name) == 3
assert len(file_path.parent.parent.name) == 3
assert len(file_path.parent.parent.parent.name) == 3
@pytest.mark.asyncio
async def test_user_isolation(test_storage):
user1_email = "user1@example.com"
user2_email = "user2@example.com"
identifier = "doc1"
data1 = {"user": "user1"}
data2 = {"user": "user2"}
await test_storage.save(user1_email, identifier, data1)
await test_storage.save(user2_email, identifier, data2)
loaded1 = await test_storage.load(user1_email, identifier)
loaded2 = await test_storage.load(user2_email, identifier)
assert loaded1 == data1
assert loaded2 == data2
@pytest.mark.asyncio
async def test_path_traversal_protection(test_storage):
user_email = "test@example.com"
malicious_identifier = "../../../etc/passwd"
await test_storage.save(user_email, malicious_identifier, {"test": "data"})
user_base = test_storage._get_user_base_path(user_email)
file_path = test_storage._get_distributed_path(user_base, malicious_identifier)
assert file_path.exists()
assert test_storage._validate_path(file_path, user_base)
assert str(file_path.resolve()).startswith(str(user_base.resolve()))
@pytest.mark.asyncio
async def test_delete(test_storage):
user_email = "test@example.com"
identifier = "doc1"
data = {"test": "data"}
await test_storage.save(user_email, identifier, data)
assert await test_storage.exists(user_email, identifier)
result = await test_storage.delete(user_email, identifier)
assert result is True
assert not await test_storage.exists(user_email, identifier)
@pytest.mark.asyncio
async def test_list_all(test_storage):
user_email = "test@example.com"
await test_storage.save(user_email, "doc1", {"id": 1})
await test_storage.save(user_email, "doc2", {"id": 2})
await test_storage.save(user_email, "doc3", {"id": 3})
all_docs = await test_storage.list_all(user_email)
assert len(all_docs) == 3
assert any(doc["id"] == 1 for doc in all_docs)
assert any(doc["id"] == 2 for doc in all_docs)
assert any(doc["id"] == 3 for doc in all_docs)
@pytest.mark.asyncio
async def test_delete_all(test_storage):
user_email = "test@example.com"
await test_storage.save(user_email, "doc1", {"id": 1})
await test_storage.save(user_email, "doc2", {"id": 2})
result = await test_storage.delete_all(user_email)
assert result is True
all_docs = await test_storage.list_all(user_email)
assert len(all_docs) == 0
@pytest.mark.asyncio
async def test_user_storage_manager(user_manager):
user_data = {
"full_name": "Test User",
"email": "test@example.com",
"password": "hashed_password",
"is_customer": True
}
await user_manager.save_user("test@example.com", user_data)
loaded_user = await user_manager.get_user("test@example.com")
assert loaded_user == user_data
assert await user_manager.user_exists("test@example.com")
await user_manager.delete_user("test@example.com")
assert not await user_manager.user_exists("test@example.com")
@pytest.mark.asyncio
async def test_list_users_by_parent(user_manager):
parent_user = {
"email": "parent@example.com",
"full_name": "Parent User",
"password": "hashed",
"is_customer": True
}
child_user1 = {
"email": "child1@example.com",
"full_name": "Child User 1",
"password": "hashed",
"parent_email": "parent@example.com",
"is_customer": True
}
child_user2 = {
"email": "child2@example.com",
"full_name": "Child User 2",
"password": "hashed",
"parent_email": "parent@example.com",
"is_customer": True
}
await user_manager.save_user("parent@example.com", parent_user)
await user_manager.save_user("child1@example.com", child_user1)
await user_manager.save_user("child2@example.com", child_user2)
children = await user_manager.list_users_by_parent("parent@example.com")
assert len(children) == 2
assert any(u["email"] == "child1@example.com" for u in children)
assert any(u["email"] == "child2@example.com" for u in children)

View File

@ -1,189 +0,0 @@
import pytest
import json
from retoors.services.user_service import UserService
import bcrypt
import datetime
@pytest.fixture
def users_file(tmp_path):
"""Fixture to create a temporary users.json file for testing."""
file = tmp_path / "users.json"
with open(file, "w") as f:
json.dump([], f) # Start with an empty list of users
return file
@pytest.fixture
def user_service(users_file):
"""Fixture to provide a UserService instance with a temporary users.json."""
return UserService(users_file)
@pytest.fixture
async def populated_user_service(user_service):
"""Fixture to provide a UserService instance with some pre-populated users."""
await user_service.create_user("Admin User", "admin@example.com", "adminpass")
await user_service.create_user("Parent User", "parent@example.com", "parentpass")
await user_service.create_user("Child User 1", "child1@example.com", "childpass", "parent@example.com")
await user_service.create_user("Child User 2", "child2@example.com", "childpass", "parent@example.com")
return user_service
async def test_create_user_success(user_service):
user = await user_service.create_user("Test User", "test@example.com", "password123")
assert user is not None
assert user["email"] == "test@example.com"
assert await user_service.get_user_by_email("test@example.com") is not None
assert bcrypt.checkpw(b"password123", user["password"].encode('utf-8'))
async def test_create_user_duplicate_email(user_service):
await user_service.create_user("Test User", "test@example.com", "password123")
with pytest.raises(ValueError, match="User with this email already exists"):
await user_service.create_user("Another User", "test@example.com", "anotherpass")
async def test_get_all_users(populated_user_service):
users = await populated_user_service.get_all_users()
assert len(users) == 4
emails = {user["email"] for user in users}
assert "admin@example.com" in emails
assert "parent@example.com" in emails
assert "child1@example.com" in emails
assert "child2@example.com" in emails
async def test_get_users_by_parent_email(populated_user_service):
children = await populated_user_service.get_users_by_parent_email("parent@example.com")
assert len(children) == 2
child_emails = {user["email"] for user in children}
assert "child1@example.com" in child_emails
assert "child2@example.com" in child_emails
no_children = await populated_user_service.get_users_by_parent_email("nonexistent@example.com")
assert len(no_children) == 0
admin_children = await populated_user_service.get_users_by_parent_email("admin@example.com")
assert len(admin_children) == 0
async def test_update_user_non_password_fields(populated_user_service):
updated_user = await populated_user_service.update_user("admin@example.com", full_name="Administrator", storage_quota_gb=10)
assert updated_user is not None
assert updated_user["full_name"] == "Administrator"
assert updated_user["storage_quota_gb"] == 10
retrieved_user = await populated_user_service.get_user_by_email("admin@example.com")
assert retrieved_user["full_name"] == "Administrator"
assert retrieved_user["storage_quota_gb"] == 10
async def test_update_user_password(populated_user_service):
updated_user = await populated_user_service.update_user("admin@example.com", password="newadminpass")
assert updated_user is not None
assert await populated_user_service.authenticate_user("admin@example.com", "newadminpass")
assert not await populated_user_service.authenticate_user("admin@example.com", "adminpass")
async def test_update_user_nonexistent(user_service):
updated_user = await user_service.update_user("nonexistent@example.com", full_name="Non Existent")
assert updated_user is None
async def test_delete_user_success(populated_user_service):
assert await populated_user_service.delete_user("admin@example.com") is True
assert await populated_user_service.get_user_by_email("admin@example.com") is None
assert len(await populated_user_service.get_all_users()) == 3
async def test_delete_user_nonexistent(user_service):
assert await user_service.delete_user("nonexistent@example.com") is False
async def test_delete_users_by_parent_email_success(populated_user_service):
deleted_count = await populated_user_service.delete_users_by_parent_email("parent@example.com")
assert deleted_count == 2
assert await populated_user_service.get_user_by_email("child1@example.com") is None
assert await populated_user_service.get_user_by_email("child2@example.com") is None
assert len(await populated_user_service.get_all_users()) == 2 # Admin and Parent users remain
async def test_delete_users_by_parent_email_no_match(user_service):
deleted_count = await user_service.delete_users_by_parent_email("nonexistent@example.com")
assert deleted_count == 0
async def test_authenticate_user_success(populated_user_service):
assert await populated_user_service.authenticate_user("admin@example.com", "adminpass") is True
async def test_authenticate_user_fail_wrong_password(populated_user_service):
assert await populated_user_service.authenticate_user("admin@example.com", "wrongpass") is False
async def test_authenticate_user_fail_nonexistent_user(user_service):
assert await user_service.authenticate_user("nonexistent@example.com", "anypass") is False
async def test_generate_reset_token_success(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
assert token is not None
user = await populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] == token
assert user["reset_token_expiry"] is not None
# Check expiry is in the future
expiry_dt = datetime.datetime.fromisoformat(user["reset_token_expiry"])
assert expiry_dt > datetime.datetime.now(datetime.timezone.utc)
async def test_generate_reset_token_nonexistent_user(user_service):
token = await user_service.generate_reset_token("nonexistent@example.com")
assert token is None
async def test_get_user_by_reset_token_valid(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
user = await populated_user_service.get_user_by_reset_token(token)
assert user is not None
assert user["email"] == "admin@example.com"
async def test_get_user_by_reset_token_invalid(populated_user_service):
user = await populated_user_service.get_user_by_reset_token("invalidtoken")
assert user is None
async def test_get_user_by_reset_token_expired(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
user = await populated_user_service.get_user_by_email("admin@example.com")
# Manually expire the token
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
await populated_user_service._save_users() # Save the expired state
user_after_expiry = await populated_user_service.get_user_by_reset_token(token)
assert user_after_expiry is None
async def test_validate_reset_token_success(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
assert await populated_user_service.validate_reset_token("admin@example.com", token) is True
async def test_validate_reset_token_fail_wrong_token(populated_user_service):
await populated_user_service.generate_reset_token("admin@example.com")
assert await populated_user_service.validate_reset_token("admin@example.com", "wrongtoken") is False
async def test_validate_reset_token_fail_nonexistent_user(user_service):
assert await user_service.validate_reset_token("nonexistent@example.com", "anytoken") is False
async def test_validate_reset_token_fail_expired_token(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
user = await populated_user_service.get_user_by_email("admin@example.com")
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
await populated_user_service._save_users()
assert await populated_user_service.validate_reset_token("admin@example.com", token) is False
async def test_reset_password_success(populated_user_service):
token = await populated_user_service.generate_reset_token("admin@example.com")
assert await populated_user_service.reset_password("admin@example.com", token, "newadminpass") is True
assert await populated_user_service.authenticate_user("admin@example.com", "newadminpass")
user = await populated_user_service.get_user_by_email("admin@example.com")
assert user["reset_token"] is None
assert user["reset_token_expiry"] is None
async def test_reset_password_fail_invalid_token(populated_user_service):
await populated_user_service.generate_reset_token("admin@example.com")
assert await populated_user_service.reset_password("admin@example.com", "invalidtoken", "newadminpass") is False
assert await populated_user_service.authenticate_user("admin@example.com", "adminpass") # Password should not change
async def test_reset_password_fail_nonexistent_user(user_service):
# Even if a token was somehow generated for a nonexistent user (which shouldn't happen),
# reset_password should fail.
assert await user_service.reset_password("nonexistent@example.com", "anytoken", "newpass") is False
async def test_update_user_quota_success(populated_user_service):
await populated_user_service.update_user_quota("admin@example.com", 20.5)
user = await populated_user_service.get_user_by_email("admin@example.com")
assert user["storage_quota_gb"] == 20.5
async def test_update_user_quota_nonexistent_user(user_service):
with pytest.raises(ValueError, match="User not found"):
await user_service.update_user_quota("nonexistent@example.com", 100)