2025-11-08 23:57:08 +01:00
|
|
|
import pytest
|
|
|
|
|
from unittest.mock import MagicMock
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import json
|
|
|
|
|
import datetime
|
|
|
|
|
import aiohttp
|
2025-11-09 01:34:53 +01:00
|
|
|
from aiohttp import web
|
2025-11-08 23:57:08 +01:00
|
|
|
from aiohttp.test_utils import TestClient
|
|
|
|
|
from aiohttp_session import setup as setup_session
|
|
|
|
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
|
|
|
|
from retoors.helpers.env_manager import get_or_create_session_secret_key
|
|
|
|
|
|
|
|
|
|
# Assuming the FileService is in retoors/services/file_service.py
|
|
|
|
|
# and the FileBrowserView is in retoors/views/site.py
|
|
|
|
|
|
2025-11-09 02:14:26 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-11-08 23:57:08 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- FileService Tests ---
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_list_files_empty(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
files = await file_service_instance.list_files(user_email)
|
|
|
|
|
assert files == []
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_create_folder(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "my_new_folder"
|
|
|
|
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
|
|
|
|
assert success
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / folder_name
|
|
|
|
|
assert expected_path.is_dir()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "existing_folder"
|
|
|
|
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
|
|
|
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
|
|
|
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
|
|
|
|
assert not success # Should return False if folder already exists
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_upload_file(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "document.txt"
|
|
|
|
|
file_content = b"Hello, world!"
|
|
|
|
|
success = await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
|
|
|
assert success
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / file_name
|
|
|
|
|
assert expected_path.is_file()
|
|
|
|
|
assert expected_path.read_bytes() == file_content
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
await file_service_instance.create_folder(user_email, "folder1")
|
|
|
|
|
await file_service_instance.upload_file(user_email, "file1.txt", b"content1")
|
|
|
|
|
await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2")
|
|
|
|
|
|
|
|
|
|
files = await file_service_instance.list_files(user_email)
|
|
|
|
|
assert len(files) == 2
|
|
|
|
|
assert any(f["name"] == "folder1" and f["is_dir"] for f in files)
|
|
|
|
|
assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files)
|
|
|
|
|
|
|
|
|
|
files_in_folder1 = await file_service_instance.list_files(user_email, "folder1")
|
|
|
|
|
assert len(files_in_folder1) == 1
|
|
|
|
|
assert files_in_folder1[0]["name"] == "file2.txt"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_download_file(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "download.txt"
|
|
|
|
|
file_content = b"Downloadable content."
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
|
|
|
|
|
|
|
|
content, name = await file_service_instance.download_file(user_email, file_name)
|
|
|
|
|
assert content == file_content
|
|
|
|
|
assert name == file_name
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_download_file_not_found(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
content = await file_service_instance.download_file(user_email, "nonexistent.txt")
|
|
|
|
|
assert content is None
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_delete_file(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "to_delete.txt"
|
|
|
|
|
(temp_user_files_dir / user_email).mkdir(exist_ok=True)
|
|
|
|
|
(temp_user_files_dir / user_email / file_name).write_bytes(b"delete me")
|
|
|
|
|
|
|
|
|
|
success = await file_service_instance.delete_item(user_email, file_name)
|
|
|
|
|
assert success
|
|
|
|
|
assert not (temp_user_files_dir / user_email / file_name).exists()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "folder_to_delete"
|
|
|
|
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
|
|
|
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
|
|
|
|
(temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested")
|
|
|
|
|
|
|
|
|
|
success = await file_service_instance.delete_item(user_email, folder_name)
|
|
|
|
|
assert success
|
|
|
|
|
assert not (temp_user_files_dir / user_email / folder_name).exists()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_delete_nonexistent(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
success = await file_service_instance.delete_item(user_email, "nonexistent_item")
|
|
|
|
|
assert not success
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_generate_share_link(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_path = "shareable.txt"
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_path, b"share content")
|
|
|
|
|
|
|
|
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
|
|
|
assert share_id is not None
|
|
|
|
|
assert isinstance(share_id, str)
|
|
|
|
|
|
|
|
|
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
|
|
|
|
assert shared_item is not None
|
|
|
|
|
assert shared_item["user_email"] == user_email
|
|
|
|
|
assert shared_item["item_path"] == file_path
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_get_shared_file_content(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_path = "shared_file.txt"
|
|
|
|
|
content_to_share = b"This is shared content."
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_path, content_to_share)
|
|
|
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
|
|
|
|
|
|
|
|
retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id)
|
|
|
|
|
assert retrieved_content == content_to_share
|
|
|
|
|
assert filename == "shared_file.txt"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_get_shared_folder_content(file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_path = "shared_folder"
|
|
|
|
|
await file_service_instance.create_folder(user_email, folder_path)
|
|
|
|
|
await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content")
|
|
|
|
|
share_id = await file_service_instance.generate_share_link(user_email, folder_path)
|
|
|
|
|
|
|
|
|
|
retrieved_content = await file_service_instance.get_shared_folder_content(share_id)
|
|
|
|
|
assert len(retrieved_content) == 1
|
|
|
|
|
assert retrieved_content[0]["name"] == "nested.txt"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_service_shared_link_expiry(file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_path = "expiring_file.txt"
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
|
|
|
|
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
|
|
|
|
|
2025-11-09 00:33:57 +01:00
|
|
|
# Mock datetime to simulate an expired link (after generating the link)
|
2025-11-08 23:57:08 +01:00
|
|
|
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
|
2025-11-09 00:33:57 +01:00
|
|
|
mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
|
|
|
|
|
mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
|
2025-11-08 23:57:08 +01:00
|
|
|
|
|
|
|
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
|
|
|
|
assert shared_item is None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# --- FileBrowserView Tests ---
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_get_unauthorized(client: TestClient):
|
|
|
|
|
resp = await client.get("/files", allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect to login
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_get_authorized_empty(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.get("/files")
|
|
|
|
|
assert resp.status == 200
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "No files found in this directory." in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
await file_service_instance.create_folder(user_email, "my_folder")
|
|
|
|
|
await file_service_instance.upload_file(user_email, "my_file.txt", b"some content")
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.get("/files")
|
|
|
|
|
assert resp.status == 200
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "my_folder" in text
|
|
|
|
|
assert "my_file.txt" in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
|
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
|
|
|
|
|
assert expected_path.is_dir()
|
|
|
|
|
|
2025-11-09 02:34:06 +01:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302
|
|
|
|
|
assert "error=Folder+name+is+required" in resp.headers["Location"]
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "existing_folder_web"
|
|
|
|
|
await file_service_instance.create_folder(user_email, folder_name) # Create it first
|
|
|
|
|
|
|
|
|
|
# Mock create_folder to return False, simulating it already exists or failed
|
|
|
|
|
mocker.patch.object(file_service_instance, "create_folder", return_value=False)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302
|
|
|
|
|
assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"]
|
|
|
|
|
|
2025-11-08 23:57:08 +01:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_content = b"Uploaded content from web."
|
|
|
|
|
|
|
|
|
|
from io import BytesIO
|
|
|
|
|
data = aiohttp.FormData()
|
|
|
|
|
data.add_field('file',
|
|
|
|
|
BytesIO(file_content),
|
|
|
|
|
filename='uploaded.txt',
|
|
|
|
|
content_type='text/plain')
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
|
2025-11-09 00:52:16 +01:00
|
|
|
assert resp.status == 200
|
2025-11-08 23:57:08 +01:00
|
|
|
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / "uploaded.txt"
|
|
|
|
|
assert expected_path.is_file()
|
|
|
|
|
assert expected_path.read_bytes() == file_content
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "web_download.txt"
|
|
|
|
|
file_content = b"Content to be downloaded via web."
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.get(f"/files/download/{file_name}")
|
|
|
|
|
assert resp.status == 200
|
|
|
|
|
assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}"
|
|
|
|
|
assert await resp.read() == file_content
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
|
2025-11-09 01:34:53 +01:00
|
|
|
response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
|
|
|
|
|
assert response.status == 404
|
|
|
|
|
assert "File not found" in await response.text()
|
2025-11-08 23:57:08 +01:00
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "web_delete.txt"
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_name, b"delete this")
|
|
|
|
|
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / file_name
|
|
|
|
|
assert expected_path.is_file()
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
|
assert not expected_path.is_file()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "web_delete_folder"
|
|
|
|
|
await file_service_instance.create_folder(user_email, folder_name)
|
|
|
|
|
await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested")
|
|
|
|
|
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / folder_name
|
|
|
|
|
assert expected_path.is_dir()
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
|
assert not expected_path.is_dir()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2025-11-09 02:14:26 +01:00
|
|
|
async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
2025-11-08 23:57:08 +01:00
|
|
|
user_email = "test@example.com"
|
2025-11-09 02:14:26 +01:00
|
|
|
file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"]
|
|
|
|
|
for name in file_names:
|
|
|
|
|
await file_service_instance.upload_file(user_email, name, b"content")
|
2025-11-08 23:57:08 +01:00
|
|
|
|
2025-11-09 02:14:26 +01:00
|
|
|
paths_to_delete = [f"{name}" for name in file_names]
|
|
|
|
|
|
|
|
|
|
# Construct FormData for multiple paths
|
|
|
|
|
data = aiohttp.FormData()
|
|
|
|
|
for path in paths_to_delete:
|
|
|
|
|
data.add_field('paths[]', path)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
|
|
|
|
|
|
for name in file_names:
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / name
|
|
|
|
|
assert not expected_path.is_file()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"]
|
|
|
|
|
for name in folder_names:
|
|
|
|
|
await file_service_instance.create_folder(user_email, name)
|
|
|
|
|
await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content")
|
|
|
|
|
|
|
|
|
|
paths_to_delete = [f"{name}" for name in folder_names]
|
|
|
|
|
|
|
|
|
|
# Construct FormData for multiple paths
|
|
|
|
|
data = aiohttp.FormData()
|
|
|
|
|
for path in paths_to_delete:
|
|
|
|
|
data.add_field('paths[]', path)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert resp.headers["Location"].startswith("/files")
|
|
|
|
|
|
|
|
|
|
for name in folder_names:
|
|
|
|
|
expected_path = temp_user_files_dir / user_email / name
|
|
|
|
|
assert not expected_path.is_dir()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302
|
|
|
|
|
assert "error=No+items+selected+for+deletion" in resp.headers["Location"]
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, temp_user_files_dir, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_names = ["fail_delete_1.txt", "fail_delete_2.txt"]
|
|
|
|
|
for name in file_names:
|
|
|
|
|
await file_service_instance.upload_file(user_email, name, b"content")
|
|
|
|
|
|
|
|
|
|
paths_to_delete = [f"{name}" for name in file_names]
|
|
|
|
|
|
|
|
|
|
# Mock delete_item to fail for the first item
|
|
|
|
|
original_delete_item = file_service_instance.delete_item
|
|
|
|
|
async def mock_delete_item(email, path):
|
|
|
|
|
if path == file_names[0]:
|
|
|
|
|
return False # Simulate failure for the first item
|
|
|
|
|
return await original_delete_item(email, path)
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item)
|
|
|
|
|
|
|
|
|
|
data = aiohttp.FormData()
|
|
|
|
|
for path in paths_to_delete:
|
|
|
|
|
data.add_field('paths[]', path)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
|
|
|
|
assert resp.status == 302 # Redirect
|
|
|
|
|
assert "error=Some+items+failed+to+delete" in resp.headers["Location"]
|
|
|
|
|
|
|
|
|
|
# Check if the first file still exists (failed to delete)
|
|
|
|
|
assert (temp_user_files_dir / user_email / file_names[0]).is_file()
|
|
|
|
|
# Check if the second file is deleted (succeeded)
|
|
|
|
|
assert not (temp_user_files_dir / user_email / file_names[1]).is_file()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.post("/files/share_multiple", json={"paths": []})
|
|
|
|
|
assert resp.status == 400
|
2025-11-08 23:57:08 +01:00
|
|
|
data = await resp.json()
|
2025-11-09 02:14:26 +01:00
|
|
|
assert data["error"] == "No items selected for sharing"
|
|
|
|
|
|
2025-11-09 02:34:06 +01:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_share_file_missing_path(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL
|
|
|
|
|
assert resp.status == 400
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
assert data["error"] == "File path is required for sharing"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "fail_share_link.txt"
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_name, b"content")
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "generate_share_link", return_value=None)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post(f"/files/share/{file_name}")
|
|
|
|
|
assert resp.status == 500
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
assert data["error"] == "Failed to generate share link"
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient):
|
|
|
|
|
resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL
|
|
|
|
|
assert resp.status == 302
|
|
|
|
|
assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"]
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "fail_delete.txt"
|
|
|
|
|
await file_service_instance.upload_file(user_email, file_name, b"content")
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "delete_item", return_value=False)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
|
|
|
|
|
assert resp.status == 302
|
|
|
|
|
assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"]
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
folder_name = "shared_folder"
|
|
|
|
|
file_name = "nested.txt"
|
|
|
|
|
share_id = "test_share_id"
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
|
|
|
|
"user_email": user_email,
|
|
|
|
|
"item_path": folder_name,
|
|
|
|
|
"share_id": share_id,
|
|
|
|
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
|
|
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
|
|
|
|
})
|
|
|
|
|
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: True))
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
|
|
|
|
|
|
|
|
|
|
resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}")
|
|
|
|
|
assert resp.status == 404
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Shared file not found or inaccessible within the shared folder." in text
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "shared_file.txt"
|
|
|
|
|
share_id = "test_share_id"
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
|
|
|
|
"user_email": user_email,
|
|
|
|
|
"item_path": file_name,
|
|
|
|
|
"share_id": share_id,
|
|
|
|
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
|
|
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
|
|
|
|
})
|
|
|
|
|
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: False))
|
|
|
|
|
|
|
|
|
|
resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt")
|
|
|
|
|
assert resp.status == 400
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Cannot download specific files from a shared item that is not a folder." in text
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
|
|
|
|
|
resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt")
|
|
|
|
|
assert resp.status == 404
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Shared link is invalid or has expired." in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient):
|
|
|
|
|
resp = await client.get("/shared_file/some_share_id/download")
|
|
|
|
|
assert resp.status == 400
|
|
|
|
|
assert "File path is required for download from shared folder." in await resp.text()
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_name = "shared_file.txt"
|
|
|
|
|
share_id = "test_share_id"
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
|
|
|
|
"user_email": user_email,
|
|
|
|
|
"item_path": file_name,
|
|
|
|
|
"share_id": share_id,
|
|
|
|
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
|
|
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
|
|
|
|
})
|
|
|
|
|
mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
|
|
|
|
|
|
|
|
|
|
resp = await client.get(f"/shared_file/{share_id}")
|
|
|
|
|
assert resp.status == 404
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Shared file not found or inaccessible" in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
item_path = "mystery_item"
|
|
|
|
|
share_id = "test_share_id"
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
|
|
|
|
"user_email": user_email,
|
|
|
|
|
"item_path": item_path,
|
|
|
|
|
"share_id": share_id,
|
|
|
|
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
|
|
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
# Mock Path.is_file and Path.is_dir to return False
|
|
|
|
|
mocker.patch("pathlib.Path.is_file", return_value=False)
|
|
|
|
|
mocker.patch("pathlib.Path.is_dir", return_value=False)
|
|
|
|
|
|
|
|
|
|
resp = await client.get(f"/shared_file/{share_id}")
|
|
|
|
|
assert resp.status == 404
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Shared item not found" in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker):
|
|
|
|
|
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
|
|
|
|
|
resp = await client.get("/shared_file/nonexistent_share_id")
|
|
|
|
|
assert resp.status == 404
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
assert "Shared link is invalid or has expired." in text
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker):
|
|
|
|
|
# Mock the route name to simulate an unknown action
|
|
|
|
|
mock_route = mocker.MagicMock(name="unknown_action")
|
|
|
|
|
mock_route.current_app = logged_in_client.app # Provide a mock current_app
|
|
|
|
|
mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route})
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False)
|
|
|
|
|
assert resp.status == 400
|
|
|
|
|
assert "Unknown file action" in await resp.text()
|
|
|
|
|
|
2025-11-09 02:14:26 +01:00
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
|
|
|
|
|
user_email = "test@example.com"
|
|
|
|
|
file_names = ["fail_share_1.txt", "fail_share_2.txt"]
|
|
|
|
|
for name in file_names:
|
|
|
|
|
await file_service_instance.upload_file(user_email, name, b"content to share")
|
|
|
|
|
|
|
|
|
|
paths_to_share = [f"{name}" for name in file_names]
|
|
|
|
|
|
|
|
|
|
# Mock generate_share_link to fail for the first item
|
|
|
|
|
original_generate_share_link = file_service_instance.generate_share_link
|
|
|
|
|
async def mock_generate_share_link(email, path):
|
|
|
|
|
if path == file_names[0]:
|
|
|
|
|
return None # Simulate failure for the first item
|
|
|
|
|
return await original_generate_share_link(email, path)
|
|
|
|
|
|
|
|
|
|
mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link)
|
|
|
|
|
|
|
|
|
|
resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share})
|
|
|
|
|
assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
assert "share_links" in data
|
|
|
|
|
assert len(data["share_links"]) == 1 # Only one link should be generated
|
|
|
|
|
assert data["share_links"][0]["name"] == file_names[1] # The successful one
|
|
|
|
|
|
2025-11-08 23:57:08 +01:00
|
|
|
|