|
import pytest
|
|
from unittest.mock import MagicMock
|
|
from pathlib import Path
|
|
import json
|
|
import datetime
|
|
import aiohttp
|
|
from aiohttp.test_utils import TestClient
|
|
from aiohttp_session import setup as setup_session
|
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
|
from retoors.helpers.env_manager import get_or_create_session_secret_key
|
|
|
|
# Assuming the FileService is in retoors/services/file_service.py
|
|
# and the FileBrowserView is in retoors/views/site.py
|
|
|
|
@pytest.fixture
|
|
def temp_user_files_dir(tmp_path):
|
|
"""Fixture to create a temporary directory for user files."""
|
|
user_files_dir = tmp_path / "user_files"
|
|
user_files_dir.mkdir()
|
|
return user_files_dir
|
|
|
|
@pytest.fixture
|
|
def temp_users_json(tmp_path):
|
|
"""Fixture to create a temporary users.json file."""
|
|
users_json_path = tmp_path / "users.json"
|
|
initial_users_data = [
|
|
{
|
|
"email": "test@example.com",
|
|
"full_name": "Test User",
|
|
"password": "hashed_password",
|
|
"storage_quota_gb": 10,
|
|
"storage_used_gb": 0,
|
|
"parent_email": None,
|
|
"shared_items": {}
|
|
},
|
|
{
|
|
"email": "child@example.com",
|
|
"full_name": "Child User",
|
|
"email": "child@example.com",
|
|
"password": "hashed_password",
|
|
"storage_quota_gb": 5,
|
|
"storage_used_gb": 0,
|
|
"parent_email": "test@example.com",
|
|
"shared_items": {}
|
|
}
|
|
]
|
|
with open(users_json_path, "w") as f:
|
|
json.dump(initial_users_data, f)
|
|
return users_json_path
|
|
|
|
@pytest.fixture
|
|
def file_service_instance(temp_user_files_dir, temp_users_json):
|
|
"""Fixture to provide a FileService instance with temporary directories."""
|
|
from retoors.services.file_service import FileService
|
|
return FileService(temp_user_files_dir, temp_users_json)
|
|
|
|
@pytest.fixture
|
|
async def logged_in_client(aiohttp_client, create_test_app, mocker):
|
|
"""Fixture to provide an aiohttp client with a logged-in user."""
|
|
app = create_test_app
|
|
client = await aiohttp_client(app)
|
|
|
|
user_service = app["user_service"]
|
|
|
|
def mock_create_user(full_name, email, password, parent_email=None):
|
|
return {
|
|
"full_name": full_name,
|
|
"email": email,
|
|
"password": "hashed_password",
|
|
"storage_quota_gb": 10,
|
|
"storage_used_gb": 0,
|
|
"parent_email": parent_email,
|
|
"shared_items": {}
|
|
}
|
|
|
|
def mock_authenticate_user(email, password):
|
|
return {
|
|
"email": email,
|
|
"full_name": "Test User",
|
|
"is_admin": False,
|
|
"storage_quota_gb": 10,
|
|
"storage_used_gb": 0
|
|
}
|
|
|
|
def mock_get_user_by_email(email):
|
|
return {
|
|
"email": email,
|
|
"full_name": "Test User",
|
|
"is_admin": False,
|
|
"storage_quota_gb": 10,
|
|
"storage_used_gb": 0
|
|
}
|
|
|
|
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
|
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
|
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
|
|
|
await client.post(
|
|
"/register",
|
|
data={
|
|
"full_name": "Test User",
|
|
"email": "test@example.com",
|
|
"password": "password",
|
|
"confirm_password": "password",
|
|
},
|
|
)
|
|
await client.post(
|
|
"/login", data={"email": "test@example.com", "password": "password"}
|
|
)
|
|
|
|
return client
|
|
|
|
@pytest.fixture
|
|
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
|
|
"""Fixture to provide an aiohttp client with a logged-in admin user."""
|
|
app = create_test_app
|
|
client = await aiohttp_client(app)
|
|
|
|
user_service = app["user_service"]
|
|
|
|
def mock_create_user(full_name, email, password, parent_email=None):
|
|
return {
|
|
"full_name": full_name,
|
|
"email": email,
|
|
"password": "hashed_password",
|
|
"storage_quota_gb": 100,
|
|
"storage_used_gb": 0,
|
|
"parent_email": parent_email,
|
|
"shared_items": {}
|
|
}
|
|
|
|
def mock_authenticate_user(email, password):
|
|
return {
|
|
"email": email,
|
|
"full_name": "Admin User",
|
|
"is_admin": True,
|
|
"storage_quota_gb": 100,
|
|
"storage_used_gb": 0
|
|
}
|
|
|
|
def mock_get_user_by_email(email):
|
|
return {
|
|
"email": email,
|
|
"full_name": "Admin User",
|
|
"is_admin": True,
|
|
"storage_quota_gb": 100,
|
|
"storage_used_gb": 0
|
|
}
|
|
|
|
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
|
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
|
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
|
|
|
await client.post(
|
|
"/register",
|
|
data={
|
|
"full_name": "Admin User",
|
|
"email": "admin@example.com",
|
|
"password": "password",
|
|
"confirm_password": "password",
|
|
},
|
|
)
|
|
await client.post(
|
|
"/login", data={"email": "admin@example.com", "password": "password"}
|
|
)
|
|
return client
|
|
|
|
|
|
# --- FileService Tests ---
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_list_files_empty(file_service_instance):
|
|
user_email = "test@example.com"
|
|
files = await file_service_instance.list_files(user_email)
|
|
assert files == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_create_folder(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
folder_name = "my_new_folder"
|
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
|
assert success
|
|
expected_path = temp_user_files_dir / user_email / folder_name
|
|
assert expected_path.is_dir()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
folder_name = "existing_folder"
|
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
|
assert not success # Should return False if folder already exists
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_upload_file(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
file_name = "document.txt"
|
|
file_content = b"Hello, world!"
|
|
success = await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
assert success
|
|
expected_path = temp_user_files_dir / user_email / file_name
|
|
assert expected_path.is_file()
|
|
assert expected_path.read_bytes() == file_content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
await file_service_instance.create_folder(user_email, "folder1")
|
|
await file_service_instance.upload_file(user_email, "file1.txt", b"content1")
|
|
await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2")
|
|
|
|
files = await file_service_instance.list_files(user_email)
|
|
assert len(files) == 2
|
|
assert any(f["name"] == "folder1" and f["is_dir"] for f in files)
|
|
assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files)
|
|
|
|
files_in_folder1 = await file_service_instance.list_files(user_email, "folder1")
|
|
assert len(files_in_folder1) == 1
|
|
assert files_in_folder1[0]["name"] == "file2.txt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_download_file(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
file_name = "download.txt"
|
|
file_content = b"Downloadable content."
|
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
|
|
content, name = await file_service_instance.download_file(user_email, file_name)
|
|
assert content == file_content
|
|
assert name == file_name
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_download_file_not_found(file_service_instance):
|
|
user_email = "test@example.com"
|
|
content = await file_service_instance.download_file(user_email, "nonexistent.txt")
|
|
assert content is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_delete_file(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
file_name = "to_delete.txt"
|
|
(temp_user_files_dir / user_email).mkdir(exist_ok=True)
|
|
(temp_user_files_dir / user_email / file_name).write_bytes(b"delete me")
|
|
|
|
success = await file_service_instance.delete_item(user_email, file_name)
|
|
assert success
|
|
assert not (temp_user_files_dir / user_email / file_name).exists()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
folder_name = "folder_to_delete"
|
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
|
(temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested")
|
|
|
|
success = await file_service_instance.delete_item(user_email, folder_name)
|
|
assert success
|
|
assert not (temp_user_files_dir / user_email / folder_name).exists()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_delete_nonexistent(file_service_instance):
|
|
user_email = "test@example.com"
|
|
success = await file_service_instance.delete_item(user_email, "nonexistent_item")
|
|
assert not success
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_generate_share_link(file_service_instance):
|
|
user_email = "test@example.com"
|
|
file_path = "shareable.txt"
|
|
await file_service_instance.upload_file(user_email, file_path, b"share content")
|
|
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
assert share_id is not None
|
|
assert isinstance(share_id, str)
|
|
|
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
|
assert shared_item is not None
|
|
assert shared_item["user_email"] == user_email
|
|
assert shared_item["item_path"] == file_path
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_get_shared_file_content(file_service_instance):
|
|
user_email = "test@example.com"
|
|
file_path = "shared_file.txt"
|
|
content_to_share = b"This is shared content."
|
|
await file_service_instance.upload_file(user_email, file_path, content_to_share)
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
|
|
retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id)
|
|
assert retrieved_content == content_to_share
|
|
assert filename == "shared_file.txt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_get_shared_folder_content(file_service_instance):
|
|
user_email = "test@example.com"
|
|
folder_path = "shared_folder"
|
|
await file_service_instance.create_folder(user_email, folder_path)
|
|
await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content")
|
|
share_id = await file_service_instance.generate_share_link(user_email, folder_path)
|
|
|
|
retrieved_content = await file_service_instance.get_shared_folder_content(share_id)
|
|
assert len(retrieved_content) == 1
|
|
assert retrieved_content[0]["name"] == "nested.txt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_service_shared_link_expiry(file_service_instance, mocker):
|
|
user_email = "test@example.com"
|
|
file_path = "expiring_file.txt"
|
|
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
|
|
# Mock datetime to simulate an expired link (after generating the link)
|
|
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
|
|
mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
|
|
mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
|
|
|
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
|
assert shared_item is None
|
|
|
|
|
|
# --- FileBrowserView Tests ---
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_get_unauthorized(client: TestClient):
|
|
resp = await client.get("/files", allow_redirects=False)
|
|
assert resp.status == 302 # Redirect to login
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_get_authorized_empty(logged_in_client: TestClient):
|
|
resp = await logged_in_client.get("/files")
|
|
assert resp.status == 200
|
|
text = await resp.text()
|
|
assert "No files found in this directory." in text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance):
|
|
user_email = "test@example.com"
|
|
await file_service_instance.create_folder(user_email, "my_folder")
|
|
await file_service_instance.upload_file(user_email, "my_file.txt", b"some content")
|
|
|
|
resp = await logged_in_client.get("/files")
|
|
assert resp.status == 200
|
|
text = await resp.text()
|
|
assert "my_folder" in text
|
|
assert "my_file.txt" in text
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False)
|
|
assert resp.status == 302 # Redirect
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
|
|
assert expected_path.is_dir()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
file_content = b"Uploaded content from web."
|
|
|
|
from io import BytesIO
|
|
data = aiohttp.FormData()
|
|
data.add_field('file',
|
|
BytesIO(file_content),
|
|
filename='uploaded.txt',
|
|
content_type='text/plain')
|
|
|
|
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
|
|
assert resp.status == 200
|
|
|
|
expected_path = temp_user_files_dir / user_email / "uploaded.txt"
|
|
assert expected_path.is_file()
|
|
assert expected_path.read_bytes() == file_content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance):
|
|
user_email = "test@example.com"
|
|
file_name = "web_download.txt"
|
|
file_content = b"Content to be downloaded via web."
|
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
|
|
resp = await logged_in_client.get(f"/files/download/{file_name}")
|
|
assert resp.status == 200
|
|
assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}"
|
|
assert await resp.read() == file_content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
|
|
resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
|
|
assert resp.status == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
file_name = "web_delete.txt"
|
|
await file_service_instance.upload_file(user_email, file_name, b"delete this")
|
|
|
|
expected_path = temp_user_files_dir / user_email / file_name
|
|
assert expected_path.is_file()
|
|
|
|
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
|
|
assert resp.status == 302 # Redirect
|
|
assert resp.headers["Location"].startswith("/files")
|
|
assert not expected_path.is_file()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
user_email = "test@example.com"
|
|
folder_name = "web_delete_folder"
|
|
await file_service_instance.create_folder(user_email, folder_name)
|
|
await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested")
|
|
|
|
expected_path = temp_user_files_dir / user_email / folder_name
|
|
assert expected_path.is_dir()
|
|
|
|
resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False)
|
|
assert resp.status == 302 # Redirect
|
|
assert resp.headers["Location"].startswith("/files")
|
|
assert not expected_path.is_dir()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance):
|
|
user_email = "test@example.com"
|
|
file_name = "web_share.txt"
|
|
await file_service_instance.upload_file(user_email, file_name, b"shareable content")
|
|
|
|
resp = await logged_in_client.post(f"/files/share/{file_name}")
|
|
assert resp.status == 200
|
|
data = await resp.json()
|
|
assert "share_link" in data
|
|
assert "http" in data["share_link"] # Check if it's a URL
|
|
|
|
# Verify the shared item can be retrieved
|
|
# The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID
|
|
share_id = data["share_link"].split("/")[-1]
|
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
|
assert shared_item is not None
|
|
assert shared_item["item_path"] == file_name
|
|
|
|
@pytest.fixture
|
|
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
|
|
"""Fixture to create a test aiohttp application with mocked services."""
|
|
from aiohttp import web
|
|
from retoors.middlewares import user_middleware, error_middleware
|
|
from retoors.services.user_service import UserService
|
|
from retoors.routes import setup_routes
|
|
import aiohttp_jinja2
|
|
import jinja2
|
|
|
|
app = web.Application()
|
|
|
|
# Setup session for the test app
|
|
project_root = Path(__file__).parent.parent
|
|
env_file_path = project_root / ".env"
|
|
secret_key = get_or_create_session_secret_key(env_file_path)
|
|
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
|
|
|
|
app.middlewares.append(error_middleware)
|
|
app.middlewares.append(user_middleware)
|
|
|
|
# Mock UserService
|
|
mock_user_service = mocker.MagicMock(spec=UserService)
|
|
|
|
# Mock scheduler
|
|
mock_scheduler = mocker.MagicMock()
|
|
mock_scheduler.spawn = mocker.AsyncMock()
|
|
mock_scheduler.close = mocker.AsyncMock()
|
|
|
|
app["user_service"] = mock_user_service
|
|
app["file_service"] = file_service_instance
|
|
app["scheduler"] = mock_scheduler
|
|
|
|
# Setup Jinja2 for templates
|
|
base_path = Path(__file__).parent.parent / "retoors"
|
|
templates_path = base_path / "templates"
|
|
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
|
|
|
|
setup_routes(app)
|
|
return app
|