diff --git a/retoors/services/file_service.py b/retoors/services/file_service.py
new file mode 100644
index 0000000..217e225
--- /dev/null
+++ b/retoors/services/file_service.py
@@ -0,0 +1,151 @@
+import aiofiles
+import json
+from pathlib import Path
+import shutil
+import uuid
+import datetime
+
+class FileService:
+ def __init__(self, base_dir: Path, users_data_path: Path):
+ self.base_dir = base_dir
+ self.users_data_path = users_data_path
+ self.base_dir.mkdir(parents=True, exist_ok=True)
+
+ async def _load_users_data(self):
+ """Loads user data from the JSON file."""
+ if not self.users_data_path.exists():
+ return {}
+ async with aiofiles.open(self.users_data_path, mode="r") as f:
+ content = await f.read()
+ return json.loads(content) if content else {}
+
+ async def _save_users_data(self, data):
+ """Saves user data to the JSON file."""
+ async with aiofiles.open(self.users_data_path, mode="w") as f:
+ await f.write(json.dumps(data, indent=4))
+
+ def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
+ """Constructs the absolute path for a user's file or directory."""
+ user_dir = self.base_dir / user_email
+ return user_dir / relative_path
+
+ async def list_files(self, user_email: str, path: str = "") -> list:
+ """Lists files and directories for a given user within a specified path."""
+ user_path = self._get_user_file_path(user_email, path)
+ if not user_path.is_dir():
+ return []
+
+ files_list = []
+ for item in user_path.iterdir():
+ if item.name.startswith('.'): # Ignore hidden files/directories
+ continue
+ file_info = {
+ "name": item.name,
+ "is_dir": item.is_dir(),
+ "path": str(item.relative_to(self._get_user_file_path(user_email))),
+ "size": item.stat().st_size if item.is_file() else 0,
+ "last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
+ }
+ files_list.append(file_info)
+ return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
+
+ async def create_folder(self, user_email: str, folder_path: str) -> bool:
+ """Creates a new folder for the user."""
+ full_path = self._get_user_file_path(user_email, folder_path)
+ if full_path.exists():
+ return False # Folder already exists
+ full_path.mkdir(parents=True, exist_ok=True)
+ return True
+
+ async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
+ """Uploads a file for the user."""
+ full_path = self._get_user_file_path(user_email, file_path)
+ full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
+ async with aiofiles.open(full_path, mode="wb") as f:
+ await f.write(content)
+ return True
+
+ async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
+ """Downloads a file for the user."""
+ full_path = self._get_user_file_path(user_email, file_path)
+ if full_path.is_file():
+ async with aiofiles.open(full_path, mode="rb") as f:
+ content = await f.read()
+ return content, full_path.name
+ return None
+
+ async def delete_item(self, user_email: str, item_path: str) -> bool:
+ """Deletes a file or folder for the user."""
+ full_path = self._get_user_file_path(user_email, item_path)
+ if not full_path.exists():
+ return False
+
+ if full_path.is_file():
+ full_path.unlink()
+ elif full_path.is_dir():
+ shutil.rmtree(full_path)
+ return True
+
+ async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
+ """Generates a shareable link for a file or folder."""
+ users_data = await self._load_users_data()
+ user = users_data.get(user_email)
+ if not user:
+ return None
+
+ full_path = self._get_user_file_path(user_email, item_path)
+ if not full_path.exists():
+ return None
+
+ share_id = str(uuid.uuid4())
+ if "shared_items" not in user:
+ user["shared_items"] = {}
+ user["shared_items"][share_id] = {
+ "user_email": user_email,
+ "item_path": item_path,
+ "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
+ "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
+ }
+ await self._save_users_data(users_data)
+ return share_id
+
+ async def get_shared_item(self, share_id: str) -> dict | None:
+ """Retrieves information about a shared item."""
+ users_data = await self._load_users_data()
+ for user_email, user_info in users_data.items():
+ if "shared_items" in user_info and share_id in user_info["shared_items"]:
+ shared_item = user_info["shared_items"][share_id]
+ expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
+ if expiry_time > datetime.datetime.now(datetime.timezone.utc):
+ return shared_item
+ return None
+
+ async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None:
+ """Retrieves the content of a shared file."""
+ shared_item = await self.get_shared_item(share_id)
+ if not shared_item:
+ return None
+
+ user_email = shared_item["user_email"]
+ item_path = shared_item["item_path"]
+ full_path = self._get_user_file_path(user_email, item_path)
+
+ if full_path.is_file():
+ async with aiofiles.open(full_path, mode="rb") as f:
+ content = await f.read()
+ return content, full_path.name
+ return None
+
+ async def get_shared_folder_content(self, share_id: str) -> list | None:
+ """Retrieves the content of a shared folder."""
+ shared_item = await self.get_shared_item(share_id)
+ if not shared_item:
+ return None
+
+ user_email = shared_item["user_email"]
+ item_path = shared_item["item_path"]
+ full_path = self._get_user_file_path(user_email, item_path)
+
+ if full_path.is_dir():
+ return await self.list_files(user_email, item_path)
+ return None
diff --git a/retoors/static/js/components/order_form.js b/retoors/static/js/components/order_form.js
new file mode 100644
index 0000000..dc64a7f
--- /dev/null
+++ b/retoors/static/js/components/order_form.js
@@ -0,0 +1,55 @@
+document.addEventListener('DOMContentLoaded', () => {
+ const mainOrderForm = document.querySelector('.order-form');
+ const customSlider = document.querySelector('custom-slider');
+ const totalPriceSpan = document.getElementById('total_price');
+
+ if (!customSlider || !totalPriceSpan) {
+ return;
+ }
+
+ const pricePerGb = parseFloat(customSlider.dataset.pricePerGb);
+
+ function updateTotalPrice() {
+ const storageAmount = parseFloat(customSlider.value);
+ const total = (storageAmount * pricePerGb).toFixed(2);
+ totalPriceSpan.textContent = `$${total}`;
+ }
+
+ updateTotalPrice();
+ customSlider.addEventListener('input', updateTotalPrice);
+
+ if (mainOrderForm) {
+ mainOrderForm.addEventListener('submit', async (event) => {
+ event.preventDefault();
+
+ const storageAmount = parseFloat(customSlider.value);
+ if (isNaN(storageAmount) || storageAmount <= 0) {
+ alert('Please select a valid storage amount.');
+ return;
+ }
+
+ try {
+ const currentUserEmail = '{{ user.email }}';
+
+ const response = await fetch(`/api/users/${currentUserEmail}/quota`, {
+ method: 'PUT',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({ new_quota_gb: storageAmount }),
+ });
+
+ const data = await response.json();
+ if (!response.ok) {
+ throw new Error(data.error || 'Failed to update quota.');
+ }
+
+ alert(data.message || 'Quota updated successfully!');
+ window.location.reload();
+ } catch (error) {
+ console.error('Error updating quota:', error);
+ alert(`Failed to update quota: ${error.message}`);
+ }
+ });
+ }
+});
diff --git a/retoors/static/js/components/users.js b/retoors/static/js/components/users.js
new file mode 100644
index 0000000..66405d9
--- /dev/null
+++ b/retoors/static/js/components/users.js
@@ -0,0 +1,99 @@
+document.addEventListener('DOMContentLoaded', () => {
+ const userQuotaList = document.getElementById('user-quota-list');
+
+ async function fetchAndRenderUsers() {
+ try {
+ const response = await fetch('/api/users');
+ if (!response.ok) {
+ throw new Error(`HTTP error! status: ${response.status}`);
+ }
+ const data = await response.json();
+ userQuotaList.innerHTML = '';
+
+ if (data.users.length === 0) {
+ userQuotaList.innerHTML = '
No users found. Click "Add New User" to create one.
';
+ return;
+ }
+
+ data.users.forEach(user => {
+ const quotaItem = document.createElement('div');
+ quotaItem.className = 'user-quota-item card';
+ const percentage = user.storage_quota_gb > 0 ? ((user.storage_used_gb / user.storage_quota_gb) * 100).toFixed(2) : 0;
+
+ quotaItem.innerHTML = `
+
+
${user.full_name}
+
${user.email}
+
+
+
+
${user.storage_used_gb} GB / ${user.storage_quota_gb} GB (${percentage}%)
+
+
+ `;
+ userQuotaList.appendChild(quotaItem);
+ });
+
+ attachButtonListeners();
+
+ } catch (error) {
+ console.error('Error fetching users:', error);
+ userQuotaList.innerHTML = 'Error loading user quotas.
';
+ }
+ }
+
+ function attachButtonListeners() {
+ document.querySelectorAll('.delete-user-btn').forEach(button => {
+ button.addEventListener('click', async (event) => {
+ const email = event.target.dataset.email;
+ if (confirm(`Are you sure you want to delete user ${email}?`)) {
+ try {
+ const response = await fetch(`/api/users/${email}`, {
+ method: 'DELETE',
+ });
+ if (!response.ok) {
+ const errorData = await response.json();
+ throw new Error(errorData.error || `HTTP error! status: ${response.status}`);
+ }
+ alert(`User ${email} deleted successfully.`);
+ fetchAndRenderUsers();
+ } catch (error) {
+ console.error('Error deleting user:', error);
+ alert(`Failed to delete user: ${error.message}`);
+ }
+ }
+ });
+ });
+
+ document.querySelectorAll('.delete-team-btn').forEach(button => {
+ button.addEventListener('click', async (event) => {
+ const parentEmail = event.target.dataset.parentEmail;
+ if (confirm(`Are you sure you want to delete the team managed by ${parentEmail}? This will delete all users created by this account.`)) {
+ try {
+ const response = await fetch(`/api/teams/${parentEmail}`, {
+ method: 'DELETE',
+ });
+ if (!response.ok) {
+ const errorData = await response.json();
+ throw new Error(errorData.error || `HTTP error! status: ${response.status}`);
+ }
+ alert(`Team managed by ${parentEmail} and associated users deleted successfully.`);
+ fetchAndRenderUsers();
+ } catch (error) {
+ console.error('Error deleting team:', error);
+ alert(`Failed to delete team: ${error.message}`);
+ }
+ }
+ });
+ });
+ }
+
+ fetchAndRenderUsers();
+});
diff --git a/retoors/templates/layouts/dashboard.html b/retoors/templates/layouts/dashboard.html
new file mode 100644
index 0000000..94d2e69
--- /dev/null
+++ b/retoors/templates/layouts/dashboard.html
@@ -0,0 +1,50 @@
+{% extends "layouts/base.html" %}
+
+{% block head %}
+
+ {% block dashboard_head %}{% endblock %}
+{% endblock %}
+
+{% block content %}
+
+
+
+
+
+
+ {% block dashboard_content %}{% endblock %}
+
+
+{% endblock %}
diff --git a/retoors/templates/pages/add_user.html b/retoors/templates/pages/add_user.html
new file mode 100644
index 0000000..8ea4d36
--- /dev/null
+++ b/retoors/templates/pages/add_user.html
@@ -0,0 +1,55 @@
+{% extends "layouts/dashboard.html" %}
+
+{% block title %}Add New User - Retoor's Cloud Solutions{% endblock %}
+
+{% block dashboard_head %}
+
+{% endblock %}
+
+{% block page_title %}Add New User{% endblock %}
+
+{% block dashboard_content %}
+
+
+ Create New User Account
+ Add a new user to your team.
+
+ {% if errors %}
+
+
+ {% for field, error in errors.items() %}
+ - {{ error }}
+ {% endfor %}
+
+
+ {% endif %}
+
+
+
+
+{% endblock %}
diff --git a/retoors/templates/pages/edit_user.html b/retoors/templates/pages/edit_user.html
new file mode 100644
index 0000000..f66c85e
--- /dev/null
+++ b/retoors/templates/pages/edit_user.html
@@ -0,0 +1,63 @@
+{% extends "layouts/dashboard.html" %}
+
+{% block title %}Edit User - Retoor's Cloud Solutions{% endblock %}
+
+{% block dashboard_head %}
+
+{% endblock %}
+
+{% block page_title %}Edit User Quota{% endblock %}
+
+{% block dashboard_content %}
+
+
+ Edit User Storage Quota
+ Adjust storage quota for {{ user_data.full_name }} ({{ user_data.email }}).
+
+ {% if errors %}
+
+
+ {% for field, error in errors.items() %}
+ - {{ error }}
+ {% endfor %}
+
+
+ {% endif %}
+
+ {% if success_message %}
+
+ {{ success_message }}
+
+ {% endif %}
+
+
+
+
Current Usage:
+
+
+ {{ user_data.storage_used_gb }} GB / {{ user_data.storage_quota_gb }} GB
+
+
+
+
{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used
+
+
+
+
+
+
+{% endblock %}
diff --git a/retoors/templates/pages/user_details.html b/retoors/templates/pages/user_details.html
new file mode 100644
index 0000000..7ef52fe
--- /dev/null
+++ b/retoors/templates/pages/user_details.html
@@ -0,0 +1,55 @@
+{% extends "layouts/dashboard.html" %}
+
+{% block title %}User Details - Retoor's Cloud Solutions{% endblock %}
+
+{% block dashboard_head %}
+
+{% endblock %}
+
+{% block page_title %}User Details{% endblock %}
+
+{% block dashboard_actions %}
+ Edit Quota
+{% endblock %}
+
+{% block dashboard_content %}
+
+
+ {{ user_data.full_name }}
+ {{ user_data.email }}
+
+
+
+
Storage Usage:
+
+
+ {{ user_data.storage_used_gb }} GB / {{ user_data.storage_quota_gb }} GB
+
+
+
+
{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used
+
+
+
+
+
+
+{% endblock %}
diff --git a/retoors/templates/pages/users.html b/retoors/templates/pages/users.html
new file mode 100644
index 0000000..db89119
--- /dev/null
+++ b/retoors/templates/pages/users.html
@@ -0,0 +1,34 @@
+{% extends "layouts/dashboard.html" %}
+
+{% block title %}User Management - Retoor's Cloud Solutions{% endblock %}
+
+{% block dashboard_head %}
+
+{% endblock %}
+
+{% block page_title %}Users{% endblock %}
+
+{% block dashboard_actions %}
+ + Add New User
+{% endblock %}
+
+{% block dashboard_content %}
+{% if success_message %}
+
+ {{ success_message }}
+
+{% endif %}
+
+{% if error_message %}
+
+ {{ error_message }}
+
+{% endif %}
+
+
+
+
+{% endblock %}
diff --git a/tests/test_file_browser.py b/tests/test_file_browser.py
new file mode 100644
index 0000000..91f7ea5
--- /dev/null
+++ b/tests/test_file_browser.py
@@ -0,0 +1,482 @@
+import pytest
+from unittest.mock import MagicMock
+from pathlib import Path
+import json
+import datetime
+import aiohttp
+from aiohttp.test_utils import TestClient
+from aiohttp_session import setup as setup_session
+from aiohttp_session.cookie_storage import EncryptedCookieStorage
+from retoors.helpers.env_manager import get_or_create_session_secret_key
+
+# Assuming the FileService is in retoors/services/file_service.py
+# and the FileBrowserView is in retoors/views/site.py
+
+@pytest.fixture
+def temp_user_files_dir(tmp_path):
+ """Fixture to create a temporary directory for user files."""
+ user_files_dir = tmp_path / "user_files"
+ user_files_dir.mkdir()
+ return user_files_dir
+
+@pytest.fixture
+def temp_users_json(tmp_path):
+ """Fixture to create a temporary users.json file."""
+ users_json_path = tmp_path / "users.json"
+ initial_users_data = {
+ "test@example.com": {
+ "full_name": "Test User",
+ "email": "test@example.com",
+ "password": "hashed_password",
+ "storage_quota_gb": 10,
+ "storage_used_gb": 0,
+ "parent_email": None,
+ "shared_items": {}
+ },
+ "child@example.com": {
+ "full_name": "Child User",
+ "email": "child@example.com",
+ "password": "hashed_password",
+ "storage_quota_gb": 5,
+ "storage_used_gb": 0,
+ "parent_email": "test@example.com",
+ "shared_items": {}
+ }
+ }
+ with open(users_json_path, "w") as f:
+ json.dump(initial_users_data, f)
+ return users_json_path
+
+@pytest.fixture
+def file_service_instance(temp_user_files_dir, temp_users_json):
+ """Fixture to provide a FileService instance with temporary directories."""
+ from retoors.services.file_service import FileService
+ return FileService(temp_user_files_dir, temp_users_json)
+
+@pytest.fixture
+async def logged_in_client(aiohttp_client, create_test_app, mocker):
+ """Fixture to provide an aiohttp client with a logged-in user."""
+ app = create_test_app
+ client = await aiohttp_client(app)
+
+ user_service = app["user_service"]
+
+ def mock_create_user(full_name, email, password, parent_email=None):
+ return {
+ "full_name": full_name,
+ "email": email,
+ "password": "hashed_password",
+ "storage_quota_gb": 10,
+ "storage_used_gb": 0,
+ "parent_email": parent_email,
+ "shared_items": {}
+ }
+
+ def mock_authenticate_user(email, password):
+ return {
+ "email": email,
+ "full_name": "Test User",
+ "is_admin": False,
+ "storage_quota_gb": 10,
+ "storage_used_gb": 0
+ }
+
+ def mock_get_user_by_email(email):
+ return {
+ "email": email,
+ "full_name": "Test User",
+ "is_admin": False,
+ "storage_quota_gb": 10,
+ "storage_used_gb": 0
+ }
+
+ mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
+ mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
+ mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
+
+ await client.post(
+ "/register",
+ data={
+ "full_name": "Test User",
+ "email": "test@example.com",
+ "password": "password",
+ "confirm_password": "password",
+ },
+ )
+ await client.post(
+ "/login", data={"email": "test@example.com", "password": "password"}
+ )
+
+ return client
+
+@pytest.fixture
+async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
+ """Fixture to provide an aiohttp client with a logged-in admin user."""
+ app = create_test_app
+ client = await aiohttp_client(app)
+
+ user_service = app["user_service"]
+
+ def mock_create_user(full_name, email, password, parent_email=None):
+ return {
+ "full_name": full_name,
+ "email": email,
+ "password": "hashed_password",
+ "storage_quota_gb": 100,
+ "storage_used_gb": 0,
+ "parent_email": parent_email,
+ "shared_items": {}
+ }
+
+ def mock_authenticate_user(email, password):
+ return {
+ "email": email,
+ "full_name": "Admin User",
+ "is_admin": True,
+ "storage_quota_gb": 100,
+ "storage_used_gb": 0
+ }
+
+ def mock_get_user_by_email(email):
+ return {
+ "email": email,
+ "full_name": "Admin User",
+ "is_admin": True,
+ "storage_quota_gb": 100,
+ "storage_used_gb": 0
+ }
+
+ mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
+ mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
+ mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
+
+ await client.post(
+ "/register",
+ data={
+ "full_name": "Admin User",
+ "email": "admin@example.com",
+ "password": "password",
+ "confirm_password": "password",
+ },
+ )
+ await client.post(
+ "/login", data={"email": "admin@example.com", "password": "password"}
+ )
+ return client
+
+
+# --- FileService Tests ---
+
+@pytest.mark.asyncio
+async def test_file_service_list_files_empty(file_service_instance):
+ user_email = "test@example.com"
+ files = await file_service_instance.list_files(user_email)
+ assert files == []
+
+@pytest.mark.asyncio
+async def test_file_service_create_folder(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ folder_name = "my_new_folder"
+ success = await file_service_instance.create_folder(user_email, folder_name)
+ assert success
+ expected_path = temp_user_files_dir / user_email / folder_name
+ assert expected_path.is_dir()
+
+@pytest.mark.asyncio
+async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ folder_name = "existing_folder"
+ (temp_user_files_dir / user_email).mkdir(parents=True)
+ (temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
+ success = await file_service_instance.create_folder(user_email, folder_name)
+ assert not success # Should return False if folder already exists
+
+@pytest.mark.asyncio
+async def test_file_service_upload_file(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ file_name = "document.txt"
+ file_content = b"Hello, world!"
+ success = await file_service_instance.upload_file(user_email, file_name, file_content)
+ assert success
+ expected_path = temp_user_files_dir / user_email / file_name
+ assert expected_path.is_file()
+ assert expected_path.read_bytes() == file_content
+
+@pytest.mark.asyncio
+async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ await file_service_instance.create_folder(user_email, "folder1")
+ await file_service_instance.upload_file(user_email, "file1.txt", b"content1")
+ await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2")
+
+ files = await file_service_instance.list_files(user_email)
+ assert len(files) == 2
+ assert any(f["name"] == "folder1" and f["is_dir"] for f in files)
+ assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files)
+
+ files_in_folder1 = await file_service_instance.list_files(user_email, "folder1")
+ assert len(files_in_folder1) == 1
+ assert files_in_folder1[0]["name"] == "file2.txt"
+
+@pytest.mark.asyncio
+async def test_file_service_download_file(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ file_name = "download.txt"
+ file_content = b"Downloadable content."
+ await file_service_instance.upload_file(user_email, file_name, file_content)
+
+ content, name = await file_service_instance.download_file(user_email, file_name)
+ assert content == file_content
+ assert name == file_name
+
+@pytest.mark.asyncio
+async def test_file_service_download_file_not_found(file_service_instance):
+ user_email = "test@example.com"
+ content = await file_service_instance.download_file(user_email, "nonexistent.txt")
+ assert content is None
+
+@pytest.mark.asyncio
+async def test_file_service_delete_file(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ file_name = "to_delete.txt"
+ (temp_user_files_dir / user_email).mkdir(exist_ok=True)
+ (temp_user_files_dir / user_email / file_name).write_bytes(b"delete me")
+
+ success = await file_service_instance.delete_item(user_email, file_name)
+ assert success
+ assert not (temp_user_files_dir / user_email / file_name).exists()
+
+@pytest.mark.asyncio
+async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ folder_name = "folder_to_delete"
+ (temp_user_files_dir / user_email).mkdir(parents=True)
+ (temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
+ (temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested")
+
+ success = await file_service_instance.delete_item(user_email, folder_name)
+ assert success
+ assert not (temp_user_files_dir / user_email / folder_name).exists()
+
+@pytest.mark.asyncio
+async def test_file_service_delete_nonexistent(file_service_instance):
+ user_email = "test@example.com"
+ success = await file_service_instance.delete_item(user_email, "nonexistent_item")
+ assert not success
+
+@pytest.mark.asyncio
+async def test_file_service_generate_share_link(file_service_instance):
+ user_email = "test@example.com"
+ file_path = "shareable.txt"
+ await file_service_instance.upload_file(user_email, file_path, b"share content")
+
+ share_id = await file_service_instance.generate_share_link(user_email, file_path)
+ assert share_id is not None
+ assert isinstance(share_id, str)
+
+ shared_item = await file_service_instance.get_shared_item(share_id)
+ assert shared_item is not None
+ assert shared_item["user_email"] == user_email
+ assert shared_item["item_path"] == file_path
+
+@pytest.mark.asyncio
+async def test_file_service_get_shared_file_content(file_service_instance):
+ user_email = "test@example.com"
+ file_path = "shared_file.txt"
+ content_to_share = b"This is shared content."
+ await file_service_instance.upload_file(user_email, file_path, content_to_share)
+ share_id = await file_service_instance.generate_share_link(user_email, file_path)
+
+ retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id)
+ assert retrieved_content == content_to_share
+ assert filename == "shared_file.txt"
+
+@pytest.mark.asyncio
+async def test_file_service_get_shared_folder_content(file_service_instance):
+ user_email = "test@example.com"
+ folder_path = "shared_folder"
+ await file_service_instance.create_folder(user_email, folder_path)
+ await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content")
+ share_id = await file_service_instance.generate_share_link(user_email, folder_path)
+
+ retrieved_content = await file_service_instance.get_shared_folder_content(share_id)
+ assert len(retrieved_content) == 1
+ assert retrieved_content[0]["name"] == "nested.txt"
+
+@pytest.mark.asyncio
+async def test_file_service_shared_link_expiry(file_service_instance, mocker):
+ user_email = "test@example.com"
+ file_path = "expiring_file.txt"
+ await file_service_instance.upload_file(user_email, file_path, b"expiring content")
+ share_id = await file_service_instance.generate_share_link(user_email, file_path)
+
+ # Mock datetime to simulate an expired link
+ future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
+ mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime))
+ datetime.datetime.now.return_value = future_time
+
+ shared_item = await file_service_instance.get_shared_item(share_id)
+ assert shared_item is None
+
+
+# --- FileBrowserView Tests ---
+
+@pytest.mark.asyncio
+async def test_file_browser_get_unauthorized(client: TestClient):
+ resp = await client.get("/files", allow_redirects=False)
+ assert resp.status == 302 # Redirect to login
+
+@pytest.mark.asyncio
+async def test_file_browser_get_authorized_empty(logged_in_client: TestClient):
+ resp = await logged_in_client.get("/files")
+ assert resp.status == 200
+ text = await resp.text()
+ assert "No files found in this directory." in text
+
+@pytest.mark.asyncio
+async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance):
+ user_email = "test@example.com"
+ await file_service_instance.create_folder(user_email, "my_folder")
+ await file_service_instance.upload_file(user_email, "my_file.txt", b"some content")
+
+ resp = await logged_in_client.get("/files")
+ assert resp.status == 200
+ text = await resp.text()
+ assert "my_folder" in text
+ assert "my_file.txt" in text
+
+@pytest.mark.asyncio
+async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False)
+ assert resp.status == 302 # Redirect
+ assert resp.headers["Location"].startswith("/files")
+
+ expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
+ assert expected_path.is_dir()
+
+@pytest.mark.asyncio
+async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ file_content = b"Uploaded content from web."
+
+ from io import BytesIO
+ data = aiohttp.FormData()
+ data.add_field('file',
+ BytesIO(file_content),
+ filename='uploaded.txt',
+ content_type='text/plain')
+
+ resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
+ assert resp.status == 302 # Redirect
+ assert resp.headers["Location"].startswith("/files")
+
+ expected_path = temp_user_files_dir / user_email / "uploaded.txt"
+ assert expected_path.is_file()
+ assert expected_path.read_bytes() == file_content
+
+@pytest.mark.asyncio
+async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance):
+ user_email = "test@example.com"
+ file_name = "web_download.txt"
+ file_content = b"Content to be downloaded via web."
+ await file_service_instance.upload_file(user_email, file_name, file_content)
+
+ resp = await logged_in_client.get(f"/files/download/{file_name}")
+ assert resp.status == 200
+ assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}"
+ assert await resp.read() == file_content
+
+@pytest.mark.asyncio
+async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
+ resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
+ assert resp.status == 404
+
+@pytest.mark.asyncio
+async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ file_name = "web_delete.txt"
+ await file_service_instance.upload_file(user_email, file_name, b"delete this")
+
+ expected_path = temp_user_files_dir / user_email / file_name
+ assert expected_path.is_file()
+
+ resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
+ assert resp.status == 302 # Redirect
+ assert resp.headers["Location"].startswith("/files")
+ assert not expected_path.is_file()
+
+@pytest.mark.asyncio
+async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
+ user_email = "test@example.com"
+ folder_name = "web_delete_folder"
+ await file_service_instance.create_folder(user_email, folder_name)
+ await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested")
+
+ expected_path = temp_user_files_dir / user_email / folder_name
+ assert expected_path.is_dir()
+
+ resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False)
+ assert resp.status == 302 # Redirect
+ assert resp.headers["Location"].startswith("/files")
+ assert not expected_path.is_dir()
+
+@pytest.mark.asyncio
+async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance):
+ user_email = "test@example.com"
+ file_name = "web_share.txt"
+ await file_service_instance.upload_file(user_email, file_name, b"shareable content")
+
+ resp = await logged_in_client.post(f"/files/share/{file_name}")
+ assert resp.status == 200
+ data = await resp.json()
+ assert "share_link" in data
+ assert "http" in data["share_link"] # Check if it's a URL
+
+ # Verify the shared item can be retrieved
+ # The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID
+ share_id = data["share_link"].split("/")[-1]
+ shared_item = await file_service_instance.get_shared_item(share_id)
+ assert shared_item is not None
+ assert shared_item["item_path"] == file_name
+
+@pytest.fixture
+def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
+ """Fixture to create a test aiohttp application with mocked services."""
+ from aiohttp import web
+ from retoors.middlewares import user_middleware, error_middleware
+ from retoors.services.user_service import UserService
+ from retoors.routes import setup_routes
+ import aiohttp_jinja2
+ import jinja2
+
+ app = web.Application()
+
+ # Setup session for the test app
+ project_root = Path(__file__).parent.parent
+ env_file_path = project_root / ".env"
+ secret_key = get_or_create_session_secret_key(env_file_path)
+ setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
+
+ app.middlewares.append(error_middleware)
+ app.middlewares.append(user_middleware)
+
+ # Mock UserService
+ mock_user_service = mocker.MagicMock(spec=UserService)
+
+ # Mock scheduler
+ mock_scheduler = mocker.MagicMock()
+ mock_scheduler.spawn = mocker.AsyncMock()
+ mock_scheduler.close = mocker.AsyncMock()
+
+ app["user_service"] = mock_user_service
+ app["file_service"] = file_service_instance
+ app["scheduler"] = mock_scheduler
+
+ # Setup Jinja2 for templates
+ base_path = Path(__file__).parent.parent / "retoors"
+ templates_path = base_path / "templates"
+ aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
+
+ setup_routes(app)
+ return app