import pytest from unittest.mock import MagicMock from pathlib import Path import json import datetime import aiohttp from aiohttp.test_utils import TestClient from aiohttp_session import setup as setup_session from aiohttp_session.cookie_storage import EncryptedCookieStorage from retoors.helpers.env_manager import get_or_create_session_secret_key # Assuming the FileService is in retoors/services/file_service.py # and the FileBrowserView is in retoors/views/site.py @pytest.fixture def temp_user_files_dir(tmp_path): """Fixture to create a temporary directory for user files.""" user_files_dir = tmp_path / "user_files" user_files_dir.mkdir() return user_files_dir @pytest.fixture def temp_users_json(tmp_path): """Fixture to create a temporary users.json file.""" users_json_path = tmp_path / "users.json" initial_users_data = { "test@example.com": { "full_name": "Test User", "email": "test@example.com", "password": "hashed_password", "storage_quota_gb": 10, "storage_used_gb": 0, "parent_email": None, "shared_items": {} }, "child@example.com": { "full_name": "Child User", "email": "child@example.com", "password": "hashed_password", "storage_quota_gb": 5, "storage_used_gb": 0, "parent_email": "test@example.com", "shared_items": {} } } with open(users_json_path, "w") as f: json.dump(initial_users_data, f) return users_json_path @pytest.fixture def file_service_instance(temp_user_files_dir, temp_users_json): """Fixture to provide a FileService instance with temporary directories.""" from retoors.services.file_service import FileService return FileService(temp_user_files_dir, temp_users_json) @pytest.fixture async def logged_in_client(aiohttp_client, create_test_app, mocker): """Fixture to provide an aiohttp client with a logged-in user.""" app = create_test_app client = await aiohttp_client(app) user_service = app["user_service"] def mock_create_user(full_name, email, password, parent_email=None): return { "full_name": full_name, "email": email, "password": "hashed_password", "storage_quota_gb": 10, "storage_used_gb": 0, "parent_email": parent_email, "shared_items": {} } def mock_authenticate_user(email, password): return { "email": email, "full_name": "Test User", "is_admin": False, "storage_quota_gb": 10, "storage_used_gb": 0 } def mock_get_user_by_email(email): return { "email": email, "full_name": "Test User", "is_admin": False, "storage_quota_gb": 10, "storage_used_gb": 0 } mocker.patch.object(user_service, "create_user", side_effect=mock_create_user) mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user) mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email) await client.post( "/register", data={ "full_name": "Test User", "email": "test@example.com", "password": "password", "confirm_password": "password", }, ) await client.post( "/login", data={"email": "test@example.com", "password": "password"} ) return client @pytest.fixture async def logged_in_admin_client(aiohttp_client, create_test_app, mocker): """Fixture to provide an aiohttp client with a logged-in admin user.""" app = create_test_app client = await aiohttp_client(app) user_service = app["user_service"] def mock_create_user(full_name, email, password, parent_email=None): return { "full_name": full_name, "email": email, "password": "hashed_password", "storage_quota_gb": 100, "storage_used_gb": 0, "parent_email": parent_email, "shared_items": {} } def mock_authenticate_user(email, password): return { "email": email, "full_name": "Admin User", "is_admin": True, "storage_quota_gb": 100, "storage_used_gb": 0 } def mock_get_user_by_email(email): return { "email": email, "full_name": "Admin User", "is_admin": True, "storage_quota_gb": 100, "storage_used_gb": 0 } mocker.patch.object(user_service, "create_user", side_effect=mock_create_user) mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user) mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email) await client.post( "/register", data={ "full_name": "Admin User", "email": "admin@example.com", "password": "password", "confirm_password": "password", }, ) await client.post( "/login", data={"email": "admin@example.com", "password": "password"} ) return client # --- FileService Tests --- @pytest.mark.asyncio async def test_file_service_list_files_empty(file_service_instance): user_email = "test@example.com" files = await file_service_instance.list_files(user_email) assert files == [] @pytest.mark.asyncio async def test_file_service_create_folder(file_service_instance, temp_user_files_dir): user_email = "test@example.com" folder_name = "my_new_folder" success = await file_service_instance.create_folder(user_email, folder_name) assert success expected_path = temp_user_files_dir / user_email / folder_name assert expected_path.is_dir() @pytest.mark.asyncio async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir): user_email = "test@example.com" folder_name = "existing_folder" (temp_user_files_dir / user_email).mkdir(parents=True) (temp_user_files_dir / user_email / folder_name).mkdir(parents=True) success = await file_service_instance.create_folder(user_email, folder_name) assert not success # Should return False if folder already exists @pytest.mark.asyncio async def test_file_service_upload_file(file_service_instance, temp_user_files_dir): user_email = "test@example.com" file_name = "document.txt" file_content = b"Hello, world!" success = await file_service_instance.upload_file(user_email, file_name, file_content) assert success expected_path = temp_user_files_dir / user_email / file_name assert expected_path.is_file() assert expected_path.read_bytes() == file_content @pytest.mark.asyncio async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir): user_email = "test@example.com" await file_service_instance.create_folder(user_email, "folder1") await file_service_instance.upload_file(user_email, "file1.txt", b"content1") await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2") files = await file_service_instance.list_files(user_email) assert len(files) == 2 assert any(f["name"] == "folder1" and f["is_dir"] for f in files) assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files) files_in_folder1 = await file_service_instance.list_files(user_email, "folder1") assert len(files_in_folder1) == 1 assert files_in_folder1[0]["name"] == "file2.txt" @pytest.mark.asyncio async def test_file_service_download_file(file_service_instance, temp_user_files_dir): user_email = "test@example.com" file_name = "download.txt" file_content = b"Downloadable content." await file_service_instance.upload_file(user_email, file_name, file_content) content, name = await file_service_instance.download_file(user_email, file_name) assert content == file_content assert name == file_name @pytest.mark.asyncio async def test_file_service_download_file_not_found(file_service_instance): user_email = "test@example.com" content = await file_service_instance.download_file(user_email, "nonexistent.txt") assert content is None @pytest.mark.asyncio async def test_file_service_delete_file(file_service_instance, temp_user_files_dir): user_email = "test@example.com" file_name = "to_delete.txt" (temp_user_files_dir / user_email).mkdir(exist_ok=True) (temp_user_files_dir / user_email / file_name).write_bytes(b"delete me") success = await file_service_instance.delete_item(user_email, file_name) assert success assert not (temp_user_files_dir / user_email / file_name).exists() @pytest.mark.asyncio async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir): user_email = "test@example.com" folder_name = "folder_to_delete" (temp_user_files_dir / user_email).mkdir(parents=True) (temp_user_files_dir / user_email / folder_name).mkdir(parents=True) (temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested") success = await file_service_instance.delete_item(user_email, folder_name) assert success assert not (temp_user_files_dir / user_email / folder_name).exists() @pytest.mark.asyncio async def test_file_service_delete_nonexistent(file_service_instance): user_email = "test@example.com" success = await file_service_instance.delete_item(user_email, "nonexistent_item") assert not success @pytest.mark.asyncio async def test_file_service_generate_share_link(file_service_instance): user_email = "test@example.com" file_path = "shareable.txt" await file_service_instance.upload_file(user_email, file_path, b"share content") share_id = await file_service_instance.generate_share_link(user_email, file_path) assert share_id is not None assert isinstance(share_id, str) shared_item = await file_service_instance.get_shared_item(share_id) assert shared_item is not None assert shared_item["user_email"] == user_email assert shared_item["item_path"] == file_path @pytest.mark.asyncio async def test_file_service_get_shared_file_content(file_service_instance): user_email = "test@example.com" file_path = "shared_file.txt" content_to_share = b"This is shared content." await file_service_instance.upload_file(user_email, file_path, content_to_share) share_id = await file_service_instance.generate_share_link(user_email, file_path) retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id) assert retrieved_content == content_to_share assert filename == "shared_file.txt" @pytest.mark.asyncio async def test_file_service_get_shared_folder_content(file_service_instance): user_email = "test@example.com" folder_path = "shared_folder" await file_service_instance.create_folder(user_email, folder_path) await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content") share_id = await file_service_instance.generate_share_link(user_email, folder_path) retrieved_content = await file_service_instance.get_shared_folder_content(share_id) assert len(retrieved_content) == 1 assert retrieved_content[0]["name"] == "nested.txt" @pytest.mark.asyncio async def test_file_service_shared_link_expiry(file_service_instance, mocker): user_email = "test@example.com" file_path = "expiring_file.txt" await file_service_instance.upload_file(user_email, file_path, b"expiring content") share_id = await file_service_instance.generate_share_link(user_email, file_path) # Mock datetime to simulate an expired link future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8) mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime)) datetime.datetime.now.return_value = future_time shared_item = await file_service_instance.get_shared_item(share_id) assert shared_item is None # --- FileBrowserView Tests --- @pytest.mark.asyncio async def test_file_browser_get_unauthorized(client: TestClient): resp = await client.get("/files", allow_redirects=False) assert resp.status == 302 # Redirect to login @pytest.mark.asyncio async def test_file_browser_get_authorized_empty(logged_in_client: TestClient): resp = await logged_in_client.get("/files") assert resp.status == 200 text = await resp.text() assert "No files found in this directory." in text @pytest.mark.asyncio async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance): user_email = "test@example.com" await file_service_instance.create_folder(user_email, "my_folder") await file_service_instance.upload_file(user_email, "my_file.txt", b"some content") resp = await logged_in_client.get("/files") assert resp.status == 200 text = await resp.text() assert "my_folder" in text assert "my_file.txt" in text @pytest.mark.asyncio async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): user_email = "test@example.com" resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False) assert resp.status == 302 # Redirect assert resp.headers["Location"].startswith("/files") expected_path = temp_user_files_dir / user_email / "new_folder_via_web" assert expected_path.is_dir() @pytest.mark.asyncio async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): user_email = "test@example.com" file_content = b"Uploaded content from web." from io import BytesIO data = aiohttp.FormData() data.add_field('file', BytesIO(file_content), filename='uploaded.txt', content_type='text/plain') resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False) assert resp.status == 302 # Redirect assert resp.headers["Location"].startswith("/files") expected_path = temp_user_files_dir / user_email / "uploaded.txt" assert expected_path.is_file() assert expected_path.read_bytes() == file_content @pytest.mark.asyncio async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance): user_email = "test@example.com" file_name = "web_download.txt" file_content = b"Content to be downloaded via web." await file_service_instance.upload_file(user_email, file_name, file_content) resp = await logged_in_client.get(f"/files/download/{file_name}") assert resp.status == 200 assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}" assert await resp.read() == file_content @pytest.mark.asyncio async def test_file_browser_download_file_not_found(logged_in_client: TestClient): resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False) assert resp.status == 404 @pytest.mark.asyncio async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): user_email = "test@example.com" file_name = "web_delete.txt" await file_service_instance.upload_file(user_email, file_name, b"delete this") expected_path = temp_user_files_dir / user_email / file_name assert expected_path.is_file() resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False) assert resp.status == 302 # Redirect assert resp.headers["Location"].startswith("/files") assert not expected_path.is_file() @pytest.mark.asyncio async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): user_email = "test@example.com" folder_name = "web_delete_folder" await file_service_instance.create_folder(user_email, folder_name) await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested") expected_path = temp_user_files_dir / user_email / folder_name assert expected_path.is_dir() resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False) assert resp.status == 302 # Redirect assert resp.headers["Location"].startswith("/files") assert not expected_path.is_dir() @pytest.mark.asyncio async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance): user_email = "test@example.com" file_name = "web_share.txt" await file_service_instance.upload_file(user_email, file_name, b"shareable content") resp = await logged_in_client.post(f"/files/share/{file_name}") assert resp.status == 200 data = await resp.json() assert "share_link" in data assert "http" in data["share_link"] # Check if it's a URL # Verify the shared item can be retrieved # The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID share_id = data["share_link"].split("/")[-1] shared_item = await file_service_instance.get_shared_item(share_id) assert shared_item is not None assert shared_item["item_path"] == file_name @pytest.fixture def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance): """Fixture to create a test aiohttp application with mocked services.""" from aiohttp import web from retoors.middlewares import user_middleware, error_middleware from retoors.services.user_service import UserService from retoors.routes import setup_routes import aiohttp_jinja2 import jinja2 app = web.Application() # Setup session for the test app project_root = Path(__file__).parent.parent env_file_path = project_root / ".env" secret_key = get_or_create_session_secret_key(env_file_path) setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8"))) app.middlewares.append(error_middleware) app.middlewares.append(user_middleware) # Mock UserService mock_user_service = mocker.MagicMock(spec=UserService) # Mock scheduler mock_scheduler = mocker.MagicMock() mock_scheduler.spawn = mocker.AsyncMock() mock_scheduler.close = mocker.AsyncMock() app["user_service"] = mock_user_service app["file_service"] = file_service_instance app["scheduler"] = mock_scheduler # Setup Jinja2 for templates base_path = Path(__file__).parent.parent / "retoors" templates_path = base_path / "templates" aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path))) setup_routes(app) return app