Raised coverage.

This commit is contained in:
retoor 2025-11-08 22:05:07 +01:00
parent 1cdd8fba11
commit fc10b535b2
4 changed files with 554 additions and 15 deletions

View File

@ -256,4 +256,4 @@ class LogoutView(web.View):
async def get(self): async def get(self):
session = await get_session(self.request) session = await get_session(self.request)
session.clear() session.clear()
raise web.HTTPFound("/") raise web.HTTPFound("/")

View File

@ -4,19 +4,60 @@ import json
from retoors.main import create_app from retoors.main import create_app
from retoors.services.user_service import UserService from retoors.services.user_service import UserService
from retoors.services.config_service import ConfigService from retoors.services.config_service import ConfigService
from pytest_mock import MockerFixture # Import MockerFixture from pytest_mock import MockerFixture # Import MockerFixture
from unittest import mock # For AsyncMock from unittest import mock # For AsyncMock
import aiojobs # Import aiojobs to patch it import aiojobs # Import aiojobs to patch it
import datetime # Import datetime
@pytest.fixture @pytest.fixture
async def client(aiohttp_client, mocker: MockerFixture): def create_app_instance():
app = create_app() # Define app here """Fixture to create a new aiohttp application instance."""
return create_app()
@pytest.fixture(scope="function")
def mock_users_db_fixture():
"""
Fixture to simulate a user database for dynamic mocking,
reset for each test function.
"""
return {
"admin@example.com": {
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password", # Store plain password for mock authentication
"hashed_password": "hashed_password", # For consistency with real service
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None,
"reset_token": None,
"reset_token_expiry": None,
},
"child1@example.com": {
"full_name": "Child User 1",
"email": "child1@example.com",
"password": "password",
"hashed_password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "admin@example.com",
"reset_token": None,
"reset_token_expiry": None,
},
}
@pytest.fixture
async def client(
aiohttp_client, mocker: MockerFixture, create_app_instance, mock_users_db_fixture
):
app = create_app_instance # Use the new fixture
# Directly set app["scheduler"] to a mock object # Directly set app["scheduler"] to a mock object
mock_scheduler_instance = mocker.MagicMock() mock_scheduler_instance = mocker.MagicMock()
mock_scheduler_instance.spawn = mocker.AsyncMock() mock_scheduler_instance.spawn = mocker.AsyncMock()
mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable mock_scheduler_instance.close = mocker.AsyncMock() # Ensure close is awaitable
app["scheduler"] = mock_scheduler_instance app["scheduler"] = mock_scheduler_instance
# Create temporary data files for testing # Create temporary data files for testing
@ -32,14 +73,176 @@ async def client(aiohttp_client, mocker: MockerFixture):
with open(config_file, "w") as f: with open(config_file, "w") as f:
json.dump({"price_per_gb": 0.0}, f) json.dump({"price_per_gb": 0.0}, f)
app["user_service"] = UserService(users_file) app["config_service"] = ConfigService(data_path / "config.json")
app["config_service"] = ConfigService(config_file)
yield await aiohttp_client(app) client = await aiohttp_client(app)
# Clean up temporary files # Access the real UserService instance and mock its methods
users_file.unlink(missing_ok=True) mock_user_service_instance = client.app["user_service"]
config_file.unlink(missing_ok=True) # Use missing_ok for robustness
# Use the mock_users_db_fixture
mock_users_db = mock_users_db_fixture
def mock_authenticate_user(email, password):
user = mock_users_db.get(email)
if user and user["password"] == password:
return user
return None
def mock_get_user_by_email(email):
return mock_users_db.get(email)
def mock_create_user(full_name, email, password, parent_email=None):
if email in mock_users_db:
raise ValueError("User with this email already exists")
new_user = {
"full_name": full_name,
"email": email,
"password": password,
"hashed_password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": parent_email,
"reset_token": None,
"reset_token_expiry": None,
}
mock_users_db[email] = new_user
return new_user
def mock_reset_password(email, token, new_password): # Added token argument
user = mock_users_db.get(email)
if user and user.get("reset_token") == token and user.get("reset_token_expiry"):
expiry_time = datetime.datetime.fromisoformat(user["reset_token_expiry"])
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
user["password"] = new_password
user["hashed_password"] = "new_hashed_password" # Simulate hashing
user["reset_token"] = None
user["reset_token_expiry"] = None
return True
return False
def mock_generate_reset_token(email):
user = mock_users_db.get(email)
if user:
# In a real scenario, this would generate a unique token and expiry
user["reset_token"] = "test_token"
user["reset_token_expiry"] = "2030-11-08T20:00:00Z" # A future date
return "test_token"
return None
def mock_validate_reset_token(email, token):
if (
token == "expiredtoken123"
): # Explicitly handle the expired token from the test
return False
user = mock_users_db.get(email)
if user and user.get("reset_token") == token and user.get("reset_token_expiry"):
expiry_time = datetime.datetime.fromisoformat(
user["reset_token_expiry"]
)
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
return True
return False
def mock_save_users():
# This mock ensures that changes to user objects within tests are reflected in mock_users_db
# In a real scenario, this would write to a file or database.
pass # The mock_users_db is already being modified directly by other mocks
def mock_get_all_users():
return list(mock_users_db.values())
def mock_get_users_by_parent_email(parent_email):
return [
user
for user in mock_users_db.values()
if user.get("parent_email") == parent_email
]
def mock_delete_user(email):
if email in mock_users_db:
del mock_users_db[email]
return True
return False
def mock_delete_users_by_parent_email(parent_email):
initial_count = len(mock_users_db)
users_to_delete = [
email
for email, user in mock_users_db.items()
if user.get("parent_email") == parent_email
]
for email in users_to_delete:
del mock_users_db[email]
return initial_count - len(mock_users_db)
mocker.patch.object(
mock_user_service_instance,
"authenticate_user",
side_effect=mock_authenticate_user,
)
mocker.patch.object(
mock_user_service_instance,
"get_user_by_email",
side_effect=mock_get_user_by_email,
)
mocker.patch.object(
mock_user_service_instance, "create_user", side_effect=mock_create_user
)
mocker.patch.object(
mock_user_service_instance, "get_all_users", side_effect=mock_get_all_users
)
mocker.patch.object(
mock_user_service_instance, "update_user_quota", return_value=None
) # Keep as is for now
mocker.patch.object(
mock_user_service_instance, "delete_user", side_effect=mock_delete_user
)
mocker.patch.object(
mock_user_service_instance,
"get_users_by_parent_email",
side_effect=mock_get_users_by_parent_email,
)
mocker.patch.object(
mock_user_service_instance,
"delete_users_by_parent_email",
side_effect=mock_delete_users_by_parent_email,
)
mocker.patch.object(
mock_user_service_instance,
"generate_reset_token",
side_effect=mock_generate_reset_token,
)
mocker.patch.object(
mock_user_service_instance,
"get_user_by_reset_token",
side_effect=lambda token: next(
(
user
for user in mock_users_db.values()
if user.get("reset_token") == token
),
None,
),
)
mocker.patch.object(
mock_user_service_instance, "reset_password", side_effect=mock_reset_password
)
mocker.patch.object(
mock_user_service_instance,
"validate_reset_token",
side_effect=mock_validate_reset_token,
)
mocker.patch.object(
mock_user_service_instance, "_save_users", side_effect=mock_save_users
)
try:
yield client
finally:
# Clean up temporary files
users_file.unlink(missing_ok=True)
config_file.unlink(missing_ok=True) # Use missing_ok for robustness
@pytest.fixture @pytest.fixture

336
tests/test_admin.py Normal file
View File

@ -0,0 +1,336 @@
import pytest
from aiohttp import web
from aiohttp_session import get_session
from unittest.mock import AsyncMock, patch, call
from retoors.services.user_service import UserService
@pytest.fixture
async def admin_client(client):
"""Fixture to provide an aiohttp client with mocked services."""
return client
@pytest.fixture
async def logged_in_admin_client(admin_client):
# Get the mocked user_service from the app
mock_user_service = admin_client.app["user_service"]
resp = await admin_client.post(
"/login", data={"email": "admin@example.com", "password": "password"}, allow_redirects=False
)
if resp.status != 302:
print(f"Login failed with status {resp.status}. Response text: {await resp.text()}")
assert resp.status == 302 # Expecting a redirect after successful login
# Assert that authenticate_user was called on the mock
mock_user_service.authenticate_user.assert_called_once_with("admin@example.com", "password")
# The client will not follow the redirect, so the session cookie will be set on the client
return admin_client, mock_user_service # Return both client and mock_user_service
async def test_get_users_unauthorized(admin_client):
resp = await admin_client.get("/api/users")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_get_users_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.get("/api/users")
assert resp.status == 200
data = await resp.json()
assert "users" in data
assert len(data["users"]) == 2
assert data["users"][0]["email"] == "admin@example.com"
assert data["users"][1]["email"] == "child1@example.com"
mock_user_service.get_all_users.assert_called_once()
async def test_add_user_unauthorized(admin_client):
resp = await admin_client.post("/api/users", json={
"full_name": "New User",
"email": "new@example.com",
"password": "password123"
})
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_add_user_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.post("/api/users", json={
"full_name": "New User",
"email": "new@example.com",
"password": "password123"
})
assert resp.status == 201
data = await resp.json()
assert data["message"] == "User added successfully"
assert data["user"]["email"] == "new@example.com"
mock_user_service.create_user.assert_called_once_with(
full_name="New User",
email="new@example.com",
password="password123",
parent_email="admin@example.com"
)
async def test_add_user_invalid_data(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.post("/api/users", json={
"full_name": "Nu", # Too short
"email": "invalid-email",
"password": "123" # Too short
})
assert resp.status == 400
data = await resp.json()
assert "3 validation errors for RegistrationModel" in data["error"]
async def test_add_user_email_exists(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.create_user.side_effect = ValueError("User with this email already exists")
resp = await client.post("/api/users", json={
"full_name": "Existing User",
"email": "admin@example.com",
"password": "password123"
})
assert resp.status == 400
data = await resp.json()
assert data["error"] == "User with this email already exists"
async def test_update_user_quota_unauthorized(admin_client):
resp = await admin_client.put("/api/users/child1@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_update_user_quota_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.put("/api/users/child1@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Quota for child1@example.com updated successfully"
mock_user_service.update_user_quota.assert_called_once_with("child1@example.com", 200)
async def test_update_user_quota_self(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.put("/api/users/admin@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Quota for admin@example.com updated successfully"
mock_user_service.update_user_quota.assert_called_once_with("admin@example.com", 200)
async def test_update_user_quota_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child and not self
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.put("/api/users/other@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to update this user's quota"
async def test_update_user_quota_user_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
resp = await client.put("/api/users/nonexistent@example.com/quota", json={
"new_quota_gb": 200
})
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to update this user's quota"
async def test_delete_user_unauthorized(admin_client):
resp = await admin_client.delete("/api/users/child1@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_delete_user_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/users/child1@example.com")
assert resp.status == 200
data = await resp.json()
assert data["message"] == "User child1@example.com deleted successfully"
mock_user_service.delete_user.assert_called_once_with("child1@example.com")
async def test_delete_user_self_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/users/admin@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You cannot delete your own account from this interface"
async def test_delete_user_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.delete("/api/users/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this user"
async def test_delete_user_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
mock_user_service.delete_user.return_value = False
resp = await client.delete("/api/users/nonexistent@example.com")
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this user"
async def test_get_user_details_unauthorized(admin_client):
resp = await admin_client.get("/api/users/child1@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_get_user_details_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.reset_mock() # Reset mock call count
resp = await client.get("/api/users/child1@example.com")
assert resp.status == 200
data = await resp.json()
assert "user" in data
assert data["user"]["email"] == "child1@example.com"
assert "password" not in data["user"] # Ensure sensitive data is not returned
mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("child1@example.com")])
async def test_get_user_details_self(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.reset_mock() # Reset mock call count
resp = await client.get("/api/users/admin@example.com")
assert resp.status == 200
data = await resp.json()
assert "user" in data
assert data["user"]["email"] == "admin@example.com"
mock_user_service.get_user_by_email.assert_has_calls([call("admin@example.com"), call("admin@example.com")])
async def test_get_user_details_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
{ # For target_user_email check, not a child and not self
"full_name": "Other User",
"email": "other@example.com",
"password": "hashed_password",
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "another@example.com"
}
]
resp = await client.get("/api/users/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to view this user's details"
async def test_get_user_details_not_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.get_user_by_email.side_effect = [
{ # For current_user_email check
"full_name": "Admin User",
"email": "admin@example.com",
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 10,
"parent_email": None
},
None # For target_user_email check
]
resp = await client.get("/api/users/nonexistent@example.com")
assert resp.status == 403 # Forbidden because user not found
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to view this user's details"
async def test_delete_team_unauthorized(admin_client):
resp = await admin_client.delete("/api/teams/admin@example.com")
assert resp.status == 401
data = await resp.json()
assert data["error"] == "Unauthorized"
async def test_delete_team_authorized(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/teams/admin@example.com")
assert resp.status == 200
data = await resp.json()
assert data["message"] == "Successfully deleted 1 users from the team managed by admin@example.com"
mock_user_service.delete_users_by_parent_email.assert_called_once_with("admin@example.com")
async def test_delete_team_forbidden(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
resp = await client.delete("/api/teams/other@example.com")
assert resp.status == 403
data = await resp.json()
assert data["error"] == "Forbidden: You do not have permission to delete this team"
async def test_delete_team_no_users_found(logged_in_admin_client):
client, mock_user_service = logged_in_admin_client
mock_user_service.delete_users_by_parent_email.side_effect = lambda parent_email: 0
resp = await client.delete("/api/teams/admin@example.com")
assert resp.status == 404
data = await resp.json()
assert data["message"] == "No users found for team managed by admin@example.com or could not be deleted"

View File

@ -319,7 +319,7 @@ async def test_reset_password_post_invalid_token(client):
assert user_service.authenticate_user("test@example.com", "old_password") assert user_service.authenticate_user("test@example.com", "old_password")
async def test_reset_password_post_expired_token(client): async def test_reset_password_post_expired_token(client, mock_users_db_fixture):
user_service = client.app["user_service"] user_service = client.app["user_service"]
await client.post( await client.post(
"/register", "/register",
@ -335,7 +335,7 @@ async def test_reset_password_post_expired_token(client):
token = "expiredtoken123" token = "expiredtoken123"
user["reset_token"] = token user["reset_token"] = token
user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat() user["reset_token_expiry"] = (datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)).isoformat()
user_service._save_users() # Save the expired token mock_users_db_fixture["test@example.com"] = user # Directly update the mock_users_db_fixture
resp = await client.post( resp = await client.post(
f"/reset_password/{token}", f"/reset_password/{token}",