From 6248a2086ca0c70f938a4b56c93f3ad8a1050c24 Mon Sep 17 00:00:00 2001 From: retoor Date: Sat, 8 Nov 2025 23:57:08 +0100 Subject: [PATCH] Upate. --- retoors/services/file_service.py | 151 +++++++ retoors/static/js/components/order_form.js | 55 +++ retoors/static/js/components/users.js | 99 +++++ retoors/templates/layouts/dashboard.html | 50 +++ retoors/templates/pages/add_user.html | 55 +++ retoors/templates/pages/edit_user.html | 63 +++ retoors/templates/pages/user_details.html | 55 +++ retoors/templates/pages/users.html | 34 ++ tests/test_file_browser.py | 482 +++++++++++++++++++++ 9 files changed, 1044 insertions(+) create mode 100644 retoors/services/file_service.py create mode 100644 retoors/static/js/components/order_form.js create mode 100644 retoors/static/js/components/users.js create mode 100644 retoors/templates/layouts/dashboard.html create mode 100644 retoors/templates/pages/add_user.html create mode 100644 retoors/templates/pages/edit_user.html create mode 100644 retoors/templates/pages/user_details.html create mode 100644 retoors/templates/pages/users.html create mode 100644 tests/test_file_browser.py diff --git a/retoors/services/file_service.py b/retoors/services/file_service.py new file mode 100644 index 0000000..217e225 --- /dev/null +++ b/retoors/services/file_service.py @@ -0,0 +1,151 @@ +import aiofiles +import json +from pathlib import Path +import shutil +import uuid +import datetime + +class FileService: + def __init__(self, base_dir: Path, users_data_path: Path): + self.base_dir = base_dir + self.users_data_path = users_data_path + self.base_dir.mkdir(parents=True, exist_ok=True) + + async def _load_users_data(self): + """Loads user data from the JSON file.""" + if not self.users_data_path.exists(): + return {} + async with aiofiles.open(self.users_data_path, mode="r") as f: + content = await f.read() + return json.loads(content) if content else {} + + async def _save_users_data(self, data): + """Saves user data to the JSON file.""" + async with aiofiles.open(self.users_data_path, mode="w") as f: + await f.write(json.dumps(data, indent=4)) + + def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path: + """Constructs the absolute path for a user's file or directory.""" + user_dir = self.base_dir / user_email + return user_dir / relative_path + + async def list_files(self, user_email: str, path: str = "") -> list: + """Lists files and directories for a given user within a specified path.""" + user_path = self._get_user_file_path(user_email, path) + if not user_path.is_dir(): + return [] + + files_list = [] + for item in user_path.iterdir(): + if item.name.startswith('.'): # Ignore hidden files/directories + continue + file_info = { + "name": item.name, + "is_dir": item.is_dir(), + "path": str(item.relative_to(self._get_user_file_path(user_email))), + "size": item.stat().st_size if item.is_file() else 0, + "last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(), + } + files_list.append(file_info) + return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower())) + + async def create_folder(self, user_email: str, folder_path: str) -> bool: + """Creates a new folder for the user.""" + full_path = self._get_user_file_path(user_email, folder_path) + if full_path.exists(): + return False # Folder already exists + full_path.mkdir(parents=True, exist_ok=True) + return True + + async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool: + """Uploads a file for the user.""" + full_path = self._get_user_file_path(user_email, file_path) + full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist + async with aiofiles.open(full_path, mode="wb") as f: + await f.write(content) + return True + + async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None: + """Downloads a file for the user.""" + full_path = self._get_user_file_path(user_email, file_path) + if full_path.is_file(): + async with aiofiles.open(full_path, mode="rb") as f: + content = await f.read() + return content, full_path.name + return None + + async def delete_item(self, user_email: str, item_path: str) -> bool: + """Deletes a file or folder for the user.""" + full_path = self._get_user_file_path(user_email, item_path) + if not full_path.exists(): + return False + + if full_path.is_file(): + full_path.unlink() + elif full_path.is_dir(): + shutil.rmtree(full_path) + return True + + async def generate_share_link(self, user_email: str, item_path: str) -> str | None: + """Generates a shareable link for a file or folder.""" + users_data = await self._load_users_data() + user = users_data.get(user_email) + if not user: + return None + + full_path = self._get_user_file_path(user_email, item_path) + if not full_path.exists(): + return None + + share_id = str(uuid.uuid4()) + if "shared_items" not in user: + user["shared_items"] = {} + user["shared_items"][share_id] = { + "user_email": user_email, + "item_path": item_path, + "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), + "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry + } + await self._save_users_data(users_data) + return share_id + + async def get_shared_item(self, share_id: str) -> dict | None: + """Retrieves information about a shared item.""" + users_data = await self._load_users_data() + for user_email, user_info in users_data.items(): + if "shared_items" in user_info and share_id in user_info["shared_items"]: + shared_item = user_info["shared_items"][share_id] + expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"]) + if expiry_time > datetime.datetime.now(datetime.timezone.utc): + return shared_item + return None + + async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None: + """Retrieves the content of a shared file.""" + shared_item = await self.get_shared_item(share_id) + if not shared_item: + return None + + user_email = shared_item["user_email"] + item_path = shared_item["item_path"] + full_path = self._get_user_file_path(user_email, item_path) + + if full_path.is_file(): + async with aiofiles.open(full_path, mode="rb") as f: + content = await f.read() + return content, full_path.name + return None + + async def get_shared_folder_content(self, share_id: str) -> list | None: + """Retrieves the content of a shared folder.""" + shared_item = await self.get_shared_item(share_id) + if not shared_item: + return None + + user_email = shared_item["user_email"] + item_path = shared_item["item_path"] + full_path = self._get_user_file_path(user_email, item_path) + + if full_path.is_dir(): + return await self.list_files(user_email, item_path) + return None diff --git a/retoors/static/js/components/order_form.js b/retoors/static/js/components/order_form.js new file mode 100644 index 0000000..dc64a7f --- /dev/null +++ b/retoors/static/js/components/order_form.js @@ -0,0 +1,55 @@ +document.addEventListener('DOMContentLoaded', () => { + const mainOrderForm = document.querySelector('.order-form'); + const customSlider = document.querySelector('custom-slider'); + const totalPriceSpan = document.getElementById('total_price'); + + if (!customSlider || !totalPriceSpan) { + return; + } + + const pricePerGb = parseFloat(customSlider.dataset.pricePerGb); + + function updateTotalPrice() { + const storageAmount = parseFloat(customSlider.value); + const total = (storageAmount * pricePerGb).toFixed(2); + totalPriceSpan.textContent = `$${total}`; + } + + updateTotalPrice(); + customSlider.addEventListener('input', updateTotalPrice); + + if (mainOrderForm) { + mainOrderForm.addEventListener('submit', async (event) => { + event.preventDefault(); + + const storageAmount = parseFloat(customSlider.value); + if (isNaN(storageAmount) || storageAmount <= 0) { + alert('Please select a valid storage amount.'); + return; + } + + try { + const currentUserEmail = '{{ user.email }}'; + + const response = await fetch(`/api/users/${currentUserEmail}/quota`, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ new_quota_gb: storageAmount }), + }); + + const data = await response.json(); + if (!response.ok) { + throw new Error(data.error || 'Failed to update quota.'); + } + + alert(data.message || 'Quota updated successfully!'); + window.location.reload(); + } catch (error) { + console.error('Error updating quota:', error); + alert(`Failed to update quota: ${error.message}`); + } + }); + } +}); diff --git a/retoors/static/js/components/users.js b/retoors/static/js/components/users.js new file mode 100644 index 0000000..66405d9 --- /dev/null +++ b/retoors/static/js/components/users.js @@ -0,0 +1,99 @@ +document.addEventListener('DOMContentLoaded', () => { + const userQuotaList = document.getElementById('user-quota-list'); + + async function fetchAndRenderUsers() { + try { + const response = await fetch('/api/users'); + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + const data = await response.json(); + userQuotaList.innerHTML = ''; + + if (data.users.length === 0) { + userQuotaList.innerHTML = '

No users found. Click "Add New User" to create one.

'; + return; + } + + data.users.forEach(user => { + const quotaItem = document.createElement('div'); + quotaItem.className = 'user-quota-item card'; + const percentage = user.storage_quota_gb > 0 ? ((user.storage_used_gb / user.storage_quota_gb) * 100).toFixed(2) : 0; + + quotaItem.innerHTML = ` +
+

${user.full_name}

+

${user.email}

+
+
+
+
+
+

${user.storage_used_gb} GB / ${user.storage_quota_gb} GB (${percentage}%)

+
+
+ View Details + Edit Quota + + ${user.parent_email === null ? `` : ''} +
+ `; + userQuotaList.appendChild(quotaItem); + }); + + attachButtonListeners(); + + } catch (error) { + console.error('Error fetching users:', error); + userQuotaList.innerHTML = '

Error loading user quotas.

'; + } + } + + function attachButtonListeners() { + document.querySelectorAll('.delete-user-btn').forEach(button => { + button.addEventListener('click', async (event) => { + const email = event.target.dataset.email; + if (confirm(`Are you sure you want to delete user ${email}?`)) { + try { + const response = await fetch(`/api/users/${email}`, { + method: 'DELETE', + }); + if (!response.ok) { + const errorData = await response.json(); + throw new Error(errorData.error || `HTTP error! status: ${response.status}`); + } + alert(`User ${email} deleted successfully.`); + fetchAndRenderUsers(); + } catch (error) { + console.error('Error deleting user:', error); + alert(`Failed to delete user: ${error.message}`); + } + } + }); + }); + + document.querySelectorAll('.delete-team-btn').forEach(button => { + button.addEventListener('click', async (event) => { + const parentEmail = event.target.dataset.parentEmail; + if (confirm(`Are you sure you want to delete the team managed by ${parentEmail}? This will delete all users created by this account.`)) { + try { + const response = await fetch(`/api/teams/${parentEmail}`, { + method: 'DELETE', + }); + if (!response.ok) { + const errorData = await response.json(); + throw new Error(errorData.error || `HTTP error! status: ${response.status}`); + } + alert(`Team managed by ${parentEmail} and associated users deleted successfully.`); + fetchAndRenderUsers(); + } catch (error) { + console.error('Error deleting team:', error); + alert(`Failed to delete team: ${error.message}`); + } + } + }); + }); + } + + fetchAndRenderUsers(); +}); diff --git a/retoors/templates/layouts/dashboard.html b/retoors/templates/layouts/dashboard.html new file mode 100644 index 0000000..94d2e69 --- /dev/null +++ b/retoors/templates/layouts/dashboard.html @@ -0,0 +1,50 @@ +{% extends "layouts/base.html" %} + +{% block head %} + + {% block dashboard_head %}{% endblock %} +{% endblock %} + +{% block content %} +
+ + +
+
+

{% block page_title %}Dashboard{% endblock %}

+
+ {% block dashboard_actions %}{% endblock %} +
+
+ + {% block dashboard_content %}{% endblock %} +
+
+{% endblock %} diff --git a/retoors/templates/pages/add_user.html b/retoors/templates/pages/add_user.html new file mode 100644 index 0000000..8ea4d36 --- /dev/null +++ b/retoors/templates/pages/add_user.html @@ -0,0 +1,55 @@ +{% extends "layouts/dashboard.html" %} + +{% block title %}Add New User - Retoor's Cloud Solutions{% endblock %} + +{% block dashboard_head %} + +{% endblock %} + +{% block page_title %}Add New User{% endblock %} + +{% block dashboard_content %} +
+
+

Create New User Account

+

Add a new user to your team.

+ + {% if errors %} +
+
    + {% for field, error in errors.items() %} +
  • {{ error }}
  • + {% endfor %} +
+
+ {% endif %} + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + Cancel +
+
+
+
+{% endblock %} diff --git a/retoors/templates/pages/edit_user.html b/retoors/templates/pages/edit_user.html new file mode 100644 index 0000000..f66c85e --- /dev/null +++ b/retoors/templates/pages/edit_user.html @@ -0,0 +1,63 @@ +{% extends "layouts/dashboard.html" %} + +{% block title %}Edit User - Retoor's Cloud Solutions{% endblock %} + +{% block dashboard_head %} + +{% endblock %} + +{% block page_title %}Edit User Quota{% endblock %} + +{% block dashboard_content %} +
+
+

Edit User Storage Quota

+

Adjust storage quota for {{ user_data.full_name }} ({{ user_data.email }}).

+ + {% if errors %} +
+
    + {% for field, error in errors.items() %} +
  • {{ error }}
  • + {% endfor %} +
+
+ {% endif %} + + {% if success_message %} +
+ {{ success_message }} +
+ {% endif %} + +
+
+

Current Usage:

+
+
+ {{ user_data.storage_used_gb }} GB / {{ user_data.storage_quota_gb }} GB +
+
+
+
+
+

{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used

+
+ +
+

Update Quota

+
+
+ + +
+
+ + Cancel +
+
+
+
+
+
+{% endblock %} diff --git a/retoors/templates/pages/user_details.html b/retoors/templates/pages/user_details.html new file mode 100644 index 0000000..7ef52fe --- /dev/null +++ b/retoors/templates/pages/user_details.html @@ -0,0 +1,55 @@ +{% extends "layouts/dashboard.html" %} + +{% block title %}User Details - Retoor's Cloud Solutions{% endblock %} + +{% block dashboard_head %} + +{% endblock %} + +{% block page_title %}User Details{% endblock %} + +{% block dashboard_actions %} + Edit Quota +{% endblock %} + +{% block dashboard_content %} +
+
+

{{ user_data.full_name }}

+

{{ user_data.email }}

+ +
+
+

Storage Usage:

+
+
+ {{ user_data.storage_used_gb }} GB / {{ user_data.storage_quota_gb }} GB +
+
+
+
+
+

{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used

+
+ +
+

Account Information

+
+

Full Name: {{ user_data.full_name }}

+

Email: {{ user_data.email }}

+

Storage Used: {{ user_data.storage_used_gb }} GB

+

Storage Quota: {{ user_data.storage_quota_gb }} GB

+

Parent Email: {{ user_data.parent_email if user_data.parent_email else 'N/A' }}

+
+
+ Edit Quota +
+ +
+ Back to Users +
+
+
+
+
+{% endblock %} diff --git a/retoors/templates/pages/users.html b/retoors/templates/pages/users.html new file mode 100644 index 0000000..db89119 --- /dev/null +++ b/retoors/templates/pages/users.html @@ -0,0 +1,34 @@ +{% extends "layouts/dashboard.html" %} + +{% block title %}User Management - Retoor's Cloud Solutions{% endblock %} + +{% block dashboard_head %} + +{% endblock %} + +{% block page_title %}Users{% endblock %} + +{% block dashboard_actions %} + + Add New User +{% endblock %} + +{% block dashboard_content %} +{% if success_message %} +
+ {{ success_message }} +
+{% endif %} + +{% if error_message %} +
+ {{ error_message }} +
+{% endif %} + +
+
+
+
+ + +{% endblock %} diff --git a/tests/test_file_browser.py b/tests/test_file_browser.py new file mode 100644 index 0000000..91f7ea5 --- /dev/null +++ b/tests/test_file_browser.py @@ -0,0 +1,482 @@ +import pytest +from unittest.mock import MagicMock +from pathlib import Path +import json +import datetime +import aiohttp +from aiohttp.test_utils import TestClient +from aiohttp_session import setup as setup_session +from aiohttp_session.cookie_storage import EncryptedCookieStorage +from retoors.helpers.env_manager import get_or_create_session_secret_key + +# Assuming the FileService is in retoors/services/file_service.py +# and the FileBrowserView is in retoors/views/site.py + +@pytest.fixture +def temp_user_files_dir(tmp_path): + """Fixture to create a temporary directory for user files.""" + user_files_dir = tmp_path / "user_files" + user_files_dir.mkdir() + return user_files_dir + +@pytest.fixture +def temp_users_json(tmp_path): + """Fixture to create a temporary users.json file.""" + users_json_path = tmp_path / "users.json" + initial_users_data = { + "test@example.com": { + "full_name": "Test User", + "email": "test@example.com", + "password": "hashed_password", + "storage_quota_gb": 10, + "storage_used_gb": 0, + "parent_email": None, + "shared_items": {} + }, + "child@example.com": { + "full_name": "Child User", + "email": "child@example.com", + "password": "hashed_password", + "storage_quota_gb": 5, + "storage_used_gb": 0, + "parent_email": "test@example.com", + "shared_items": {} + } + } + with open(users_json_path, "w") as f: + json.dump(initial_users_data, f) + return users_json_path + +@pytest.fixture +def file_service_instance(temp_user_files_dir, temp_users_json): + """Fixture to provide a FileService instance with temporary directories.""" + from retoors.services.file_service import FileService + return FileService(temp_user_files_dir, temp_users_json) + +@pytest.fixture +async def logged_in_client(aiohttp_client, create_test_app, mocker): + """Fixture to provide an aiohttp client with a logged-in user.""" + app = create_test_app + client = await aiohttp_client(app) + + user_service = app["user_service"] + + def mock_create_user(full_name, email, password, parent_email=None): + return { + "full_name": full_name, + "email": email, + "password": "hashed_password", + "storage_quota_gb": 10, + "storage_used_gb": 0, + "parent_email": parent_email, + "shared_items": {} + } + + def mock_authenticate_user(email, password): + return { + "email": email, + "full_name": "Test User", + "is_admin": False, + "storage_quota_gb": 10, + "storage_used_gb": 0 + } + + def mock_get_user_by_email(email): + return { + "email": email, + "full_name": "Test User", + "is_admin": False, + "storage_quota_gb": 10, + "storage_used_gb": 0 + } + + mocker.patch.object(user_service, "create_user", side_effect=mock_create_user) + mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user) + mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email) + + await client.post( + "/register", + data={ + "full_name": "Test User", + "email": "test@example.com", + "password": "password", + "confirm_password": "password", + }, + ) + await client.post( + "/login", data={"email": "test@example.com", "password": "password"} + ) + + return client + +@pytest.fixture +async def logged_in_admin_client(aiohttp_client, create_test_app, mocker): + """Fixture to provide an aiohttp client with a logged-in admin user.""" + app = create_test_app + client = await aiohttp_client(app) + + user_service = app["user_service"] + + def mock_create_user(full_name, email, password, parent_email=None): + return { + "full_name": full_name, + "email": email, + "password": "hashed_password", + "storage_quota_gb": 100, + "storage_used_gb": 0, + "parent_email": parent_email, + "shared_items": {} + } + + def mock_authenticate_user(email, password): + return { + "email": email, + "full_name": "Admin User", + "is_admin": True, + "storage_quota_gb": 100, + "storage_used_gb": 0 + } + + def mock_get_user_by_email(email): + return { + "email": email, + "full_name": "Admin User", + "is_admin": True, + "storage_quota_gb": 100, + "storage_used_gb": 0 + } + + mocker.patch.object(user_service, "create_user", side_effect=mock_create_user) + mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user) + mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email) + + await client.post( + "/register", + data={ + "full_name": "Admin User", + "email": "admin@example.com", + "password": "password", + "confirm_password": "password", + }, + ) + await client.post( + "/login", data={"email": "admin@example.com", "password": "password"} + ) + return client + + +# --- FileService Tests --- + +@pytest.mark.asyncio +async def test_file_service_list_files_empty(file_service_instance): + user_email = "test@example.com" + files = await file_service_instance.list_files(user_email) + assert files == [] + +@pytest.mark.asyncio +async def test_file_service_create_folder(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + folder_name = "my_new_folder" + success = await file_service_instance.create_folder(user_email, folder_name) + assert success + expected_path = temp_user_files_dir / user_email / folder_name + assert expected_path.is_dir() + +@pytest.mark.asyncio +async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + folder_name = "existing_folder" + (temp_user_files_dir / user_email).mkdir(parents=True) + (temp_user_files_dir / user_email / folder_name).mkdir(parents=True) + success = await file_service_instance.create_folder(user_email, folder_name) + assert not success # Should return False if folder already exists + +@pytest.mark.asyncio +async def test_file_service_upload_file(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + file_name = "document.txt" + file_content = b"Hello, world!" + success = await file_service_instance.upload_file(user_email, file_name, file_content) + assert success + expected_path = temp_user_files_dir / user_email / file_name + assert expected_path.is_file() + assert expected_path.read_bytes() == file_content + +@pytest.mark.asyncio +async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + await file_service_instance.create_folder(user_email, "folder1") + await file_service_instance.upload_file(user_email, "file1.txt", b"content1") + await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2") + + files = await file_service_instance.list_files(user_email) + assert len(files) == 2 + assert any(f["name"] == "folder1" and f["is_dir"] for f in files) + assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files) + + files_in_folder1 = await file_service_instance.list_files(user_email, "folder1") + assert len(files_in_folder1) == 1 + assert files_in_folder1[0]["name"] == "file2.txt" + +@pytest.mark.asyncio +async def test_file_service_download_file(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + file_name = "download.txt" + file_content = b"Downloadable content." + await file_service_instance.upload_file(user_email, file_name, file_content) + + content, name = await file_service_instance.download_file(user_email, file_name) + assert content == file_content + assert name == file_name + +@pytest.mark.asyncio +async def test_file_service_download_file_not_found(file_service_instance): + user_email = "test@example.com" + content = await file_service_instance.download_file(user_email, "nonexistent.txt") + assert content is None + +@pytest.mark.asyncio +async def test_file_service_delete_file(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + file_name = "to_delete.txt" + (temp_user_files_dir / user_email).mkdir(exist_ok=True) + (temp_user_files_dir / user_email / file_name).write_bytes(b"delete me") + + success = await file_service_instance.delete_item(user_email, file_name) + assert success + assert not (temp_user_files_dir / user_email / file_name).exists() + +@pytest.mark.asyncio +async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + folder_name = "folder_to_delete" + (temp_user_files_dir / user_email).mkdir(parents=True) + (temp_user_files_dir / user_email / folder_name).mkdir(parents=True) + (temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested") + + success = await file_service_instance.delete_item(user_email, folder_name) + assert success + assert not (temp_user_files_dir / user_email / folder_name).exists() + +@pytest.mark.asyncio +async def test_file_service_delete_nonexistent(file_service_instance): + user_email = "test@example.com" + success = await file_service_instance.delete_item(user_email, "nonexistent_item") + assert not success + +@pytest.mark.asyncio +async def test_file_service_generate_share_link(file_service_instance): + user_email = "test@example.com" + file_path = "shareable.txt" + await file_service_instance.upload_file(user_email, file_path, b"share content") + + share_id = await file_service_instance.generate_share_link(user_email, file_path) + assert share_id is not None + assert isinstance(share_id, str) + + shared_item = await file_service_instance.get_shared_item(share_id) + assert shared_item is not None + assert shared_item["user_email"] == user_email + assert shared_item["item_path"] == file_path + +@pytest.mark.asyncio +async def test_file_service_get_shared_file_content(file_service_instance): + user_email = "test@example.com" + file_path = "shared_file.txt" + content_to_share = b"This is shared content." + await file_service_instance.upload_file(user_email, file_path, content_to_share) + share_id = await file_service_instance.generate_share_link(user_email, file_path) + + retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id) + assert retrieved_content == content_to_share + assert filename == "shared_file.txt" + +@pytest.mark.asyncio +async def test_file_service_get_shared_folder_content(file_service_instance): + user_email = "test@example.com" + folder_path = "shared_folder" + await file_service_instance.create_folder(user_email, folder_path) + await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content") + share_id = await file_service_instance.generate_share_link(user_email, folder_path) + + retrieved_content = await file_service_instance.get_shared_folder_content(share_id) + assert len(retrieved_content) == 1 + assert retrieved_content[0]["name"] == "nested.txt" + +@pytest.mark.asyncio +async def test_file_service_shared_link_expiry(file_service_instance, mocker): + user_email = "test@example.com" + file_path = "expiring_file.txt" + await file_service_instance.upload_file(user_email, file_path, b"expiring content") + share_id = await file_service_instance.generate_share_link(user_email, file_path) + + # Mock datetime to simulate an expired link + future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8) + mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime)) + datetime.datetime.now.return_value = future_time + + shared_item = await file_service_instance.get_shared_item(share_id) + assert shared_item is None + + +# --- FileBrowserView Tests --- + +@pytest.mark.asyncio +async def test_file_browser_get_unauthorized(client: TestClient): + resp = await client.get("/files", allow_redirects=False) + assert resp.status == 302 # Redirect to login + +@pytest.mark.asyncio +async def test_file_browser_get_authorized_empty(logged_in_client: TestClient): + resp = await logged_in_client.get("/files") + assert resp.status == 200 + text = await resp.text() + assert "No files found in this directory." in text + +@pytest.mark.asyncio +async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance): + user_email = "test@example.com" + await file_service_instance.create_folder(user_email, "my_folder") + await file_service_instance.upload_file(user_email, "my_file.txt", b"some content") + + resp = await logged_in_client.get("/files") + assert resp.status == 200 + text = await resp.text() + assert "my_folder" in text + assert "my_file.txt" in text + +@pytest.mark.asyncio +async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False) + assert resp.status == 302 # Redirect + assert resp.headers["Location"].startswith("/files") + + expected_path = temp_user_files_dir / user_email / "new_folder_via_web" + assert expected_path.is_dir() + +@pytest.mark.asyncio +async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + file_content = b"Uploaded content from web." + + from io import BytesIO + data = aiohttp.FormData() + data.add_field('file', + BytesIO(file_content), + filename='uploaded.txt', + content_type='text/plain') + + resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False) + assert resp.status == 302 # Redirect + assert resp.headers["Location"].startswith("/files") + + expected_path = temp_user_files_dir / user_email / "uploaded.txt" + assert expected_path.is_file() + assert expected_path.read_bytes() == file_content + +@pytest.mark.asyncio +async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance): + user_email = "test@example.com" + file_name = "web_download.txt" + file_content = b"Content to be downloaded via web." + await file_service_instance.upload_file(user_email, file_name, file_content) + + resp = await logged_in_client.get(f"/files/download/{file_name}") + assert resp.status == 200 + assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}" + assert await resp.read() == file_content + +@pytest.mark.asyncio +async def test_file_browser_download_file_not_found(logged_in_client: TestClient): + resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False) + assert resp.status == 404 + +@pytest.mark.asyncio +async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + file_name = "web_delete.txt" + await file_service_instance.upload_file(user_email, file_name, b"delete this") + + expected_path = temp_user_files_dir / user_email / file_name + assert expected_path.is_file() + + resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False) + assert resp.status == 302 # Redirect + assert resp.headers["Location"].startswith("/files") + assert not expected_path.is_file() + +@pytest.mark.asyncio +async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): + user_email = "test@example.com" + folder_name = "web_delete_folder" + await file_service_instance.create_folder(user_email, folder_name) + await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested") + + expected_path = temp_user_files_dir / user_email / folder_name + assert expected_path.is_dir() + + resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False) + assert resp.status == 302 # Redirect + assert resp.headers["Location"].startswith("/files") + assert not expected_path.is_dir() + +@pytest.mark.asyncio +async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance): + user_email = "test@example.com" + file_name = "web_share.txt" + await file_service_instance.upload_file(user_email, file_name, b"shareable content") + + resp = await logged_in_client.post(f"/files/share/{file_name}") + assert resp.status == 200 + data = await resp.json() + assert "share_link" in data + assert "http" in data["share_link"] # Check if it's a URL + + # Verify the shared item can be retrieved + # The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID + share_id = data["share_link"].split("/")[-1] + shared_item = await file_service_instance.get_shared_item(share_id) + assert shared_item is not None + assert shared_item["item_path"] == file_name + +@pytest.fixture +def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance): + """Fixture to create a test aiohttp application with mocked services.""" + from aiohttp import web + from retoors.middlewares import user_middleware, error_middleware + from retoors.services.user_service import UserService + from retoors.routes import setup_routes + import aiohttp_jinja2 + import jinja2 + + app = web.Application() + + # Setup session for the test app + project_root = Path(__file__).parent.parent + env_file_path = project_root / ".env" + secret_key = get_or_create_session_secret_key(env_file_path) + setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8"))) + + app.middlewares.append(error_middleware) + app.middlewares.append(user_middleware) + + # Mock UserService + mock_user_service = mocker.MagicMock(spec=UserService) + + # Mock scheduler + mock_scheduler = mocker.MagicMock() + mock_scheduler.spawn = mocker.AsyncMock() + mock_scheduler.close = mocker.AsyncMock() + + app["user_service"] = mock_user_service + app["file_service"] = file_service_instance + app["scheduler"] = mock_scheduler + + # Setup Jinja2 for templates + base_path = Path(__file__).parent.parent / "retoors" + templates_path = base_path / "templates" + aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path))) + + setup_routes(app) + return app