Upate.
This commit is contained in:
parent
a6a19b8438
commit
6248a2086c
151
retoors/services/file_service.py
Normal file
151
retoors/services/file_service.py
Normal file
@ -0,0 +1,151 @@
|
|||||||
|
import aiofiles
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
class FileService:
|
||||||
|
def __init__(self, base_dir: Path, users_data_path: Path):
|
||||||
|
self.base_dir = base_dir
|
||||||
|
self.users_data_path = users_data_path
|
||||||
|
self.base_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
async def _load_users_data(self):
|
||||||
|
"""Loads user data from the JSON file."""
|
||||||
|
if not self.users_data_path.exists():
|
||||||
|
return {}
|
||||||
|
async with aiofiles.open(self.users_data_path, mode="r") as f:
|
||||||
|
content = await f.read()
|
||||||
|
return json.loads(content) if content else {}
|
||||||
|
|
||||||
|
async def _save_users_data(self, data):
|
||||||
|
"""Saves user data to the JSON file."""
|
||||||
|
async with aiofiles.open(self.users_data_path, mode="w") as f:
|
||||||
|
await f.write(json.dumps(data, indent=4))
|
||||||
|
|
||||||
|
def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
|
||||||
|
"""Constructs the absolute path for a user's file or directory."""
|
||||||
|
user_dir = self.base_dir / user_email
|
||||||
|
return user_dir / relative_path
|
||||||
|
|
||||||
|
async def list_files(self, user_email: str, path: str = "") -> list:
|
||||||
|
"""Lists files and directories for a given user within a specified path."""
|
||||||
|
user_path = self._get_user_file_path(user_email, path)
|
||||||
|
if not user_path.is_dir():
|
||||||
|
return []
|
||||||
|
|
||||||
|
files_list = []
|
||||||
|
for item in user_path.iterdir():
|
||||||
|
if item.name.startswith('.'): # Ignore hidden files/directories
|
||||||
|
continue
|
||||||
|
file_info = {
|
||||||
|
"name": item.name,
|
||||||
|
"is_dir": item.is_dir(),
|
||||||
|
"path": str(item.relative_to(self._get_user_file_path(user_email))),
|
||||||
|
"size": item.stat().st_size if item.is_file() else 0,
|
||||||
|
"last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
|
||||||
|
}
|
||||||
|
files_list.append(file_info)
|
||||||
|
return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
|
||||||
|
|
||||||
|
async def create_folder(self, user_email: str, folder_path: str) -> bool:
|
||||||
|
"""Creates a new folder for the user."""
|
||||||
|
full_path = self._get_user_file_path(user_email, folder_path)
|
||||||
|
if full_path.exists():
|
||||||
|
return False # Folder already exists
|
||||||
|
full_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
|
||||||
|
"""Uploads a file for the user."""
|
||||||
|
full_path = self._get_user_file_path(user_email, file_path)
|
||||||
|
full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
|
||||||
|
async with aiofiles.open(full_path, mode="wb") as f:
|
||||||
|
await f.write(content)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
|
||||||
|
"""Downloads a file for the user."""
|
||||||
|
full_path = self._get_user_file_path(user_email, file_path)
|
||||||
|
if full_path.is_file():
|
||||||
|
async with aiofiles.open(full_path, mode="rb") as f:
|
||||||
|
content = await f.read()
|
||||||
|
return content, full_path.name
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def delete_item(self, user_email: str, item_path: str) -> bool:
|
||||||
|
"""Deletes a file or folder for the user."""
|
||||||
|
full_path = self._get_user_file_path(user_email, item_path)
|
||||||
|
if not full_path.exists():
|
||||||
|
return False
|
||||||
|
|
||||||
|
if full_path.is_file():
|
||||||
|
full_path.unlink()
|
||||||
|
elif full_path.is_dir():
|
||||||
|
shutil.rmtree(full_path)
|
||||||
|
return True
|
||||||
|
|
||||||
|
async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
|
||||||
|
"""Generates a shareable link for a file or folder."""
|
||||||
|
users_data = await self._load_users_data()
|
||||||
|
user = users_data.get(user_email)
|
||||||
|
if not user:
|
||||||
|
return None
|
||||||
|
|
||||||
|
full_path = self._get_user_file_path(user_email, item_path)
|
||||||
|
if not full_path.exists():
|
||||||
|
return None
|
||||||
|
|
||||||
|
share_id = str(uuid.uuid4())
|
||||||
|
if "shared_items" not in user:
|
||||||
|
user["shared_items"] = {}
|
||||||
|
user["shared_items"][share_id] = {
|
||||||
|
"user_email": user_email,
|
||||||
|
"item_path": item_path,
|
||||||
|
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||||||
|
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
|
||||||
|
}
|
||||||
|
await self._save_users_data(users_data)
|
||||||
|
return share_id
|
||||||
|
|
||||||
|
async def get_shared_item(self, share_id: str) -> dict | None:
|
||||||
|
"""Retrieves information about a shared item."""
|
||||||
|
users_data = await self._load_users_data()
|
||||||
|
for user_email, user_info in users_data.items():
|
||||||
|
if "shared_items" in user_info and share_id in user_info["shared_items"]:
|
||||||
|
shared_item = user_info["shared_items"][share_id]
|
||||||
|
expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
|
||||||
|
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
|
||||||
|
return shared_item
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None:
|
||||||
|
"""Retrieves the content of a shared file."""
|
||||||
|
shared_item = await self.get_shared_item(share_id)
|
||||||
|
if not shared_item:
|
||||||
|
return None
|
||||||
|
|
||||||
|
user_email = shared_item["user_email"]
|
||||||
|
item_path = shared_item["item_path"]
|
||||||
|
full_path = self._get_user_file_path(user_email, item_path)
|
||||||
|
|
||||||
|
if full_path.is_file():
|
||||||
|
async with aiofiles.open(full_path, mode="rb") as f:
|
||||||
|
content = await f.read()
|
||||||
|
return content, full_path.name
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_shared_folder_content(self, share_id: str) -> list | None:
|
||||||
|
"""Retrieves the content of a shared folder."""
|
||||||
|
shared_item = await self.get_shared_item(share_id)
|
||||||
|
if not shared_item:
|
||||||
|
return None
|
||||||
|
|
||||||
|
user_email = shared_item["user_email"]
|
||||||
|
item_path = shared_item["item_path"]
|
||||||
|
full_path = self._get_user_file_path(user_email, item_path)
|
||||||
|
|
||||||
|
if full_path.is_dir():
|
||||||
|
return await self.list_files(user_email, item_path)
|
||||||
|
return None
|
||||||
55
retoors/static/js/components/order_form.js
Normal file
55
retoors/static/js/components/order_form.js
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
document.addEventListener('DOMContentLoaded', () => {
|
||||||
|
const mainOrderForm = document.querySelector('.order-form');
|
||||||
|
const customSlider = document.querySelector('custom-slider');
|
||||||
|
const totalPriceSpan = document.getElementById('total_price');
|
||||||
|
|
||||||
|
if (!customSlider || !totalPriceSpan) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pricePerGb = parseFloat(customSlider.dataset.pricePerGb);
|
||||||
|
|
||||||
|
function updateTotalPrice() {
|
||||||
|
const storageAmount = parseFloat(customSlider.value);
|
||||||
|
const total = (storageAmount * pricePerGb).toFixed(2);
|
||||||
|
totalPriceSpan.textContent = `$${total}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
updateTotalPrice();
|
||||||
|
customSlider.addEventListener('input', updateTotalPrice);
|
||||||
|
|
||||||
|
if (mainOrderForm) {
|
||||||
|
mainOrderForm.addEventListener('submit', async (event) => {
|
||||||
|
event.preventDefault();
|
||||||
|
|
||||||
|
const storageAmount = parseFloat(customSlider.value);
|
||||||
|
if (isNaN(storageAmount) || storageAmount <= 0) {
|
||||||
|
alert('Please select a valid storage amount.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const currentUserEmail = '{{ user.email }}';
|
||||||
|
|
||||||
|
const response = await fetch(`/api/users/${currentUserEmail}/quota`, {
|
||||||
|
method: 'PUT',
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
},
|
||||||
|
body: JSON.stringify({ new_quota_gb: storageAmount }),
|
||||||
|
});
|
||||||
|
|
||||||
|
const data = await response.json();
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(data.error || 'Failed to update quota.');
|
||||||
|
}
|
||||||
|
|
||||||
|
alert(data.message || 'Quota updated successfully!');
|
||||||
|
window.location.reload();
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error updating quota:', error);
|
||||||
|
alert(`Failed to update quota: ${error.message}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
99
retoors/static/js/components/users.js
Normal file
99
retoors/static/js/components/users.js
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
document.addEventListener('DOMContentLoaded', () => {
|
||||||
|
const userQuotaList = document.getElementById('user-quota-list');
|
||||||
|
|
||||||
|
async function fetchAndRenderUsers() {
|
||||||
|
try {
|
||||||
|
const response = await fetch('/api/users');
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`HTTP error! status: ${response.status}`);
|
||||||
|
}
|
||||||
|
const data = await response.json();
|
||||||
|
userQuotaList.innerHTML = '';
|
||||||
|
|
||||||
|
if (data.users.length === 0) {
|
||||||
|
userQuotaList.innerHTML = '<p style="text-align: center; padding: 40px;">No users found. Click "Add New User" to create one.</p>';
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
data.users.forEach(user => {
|
||||||
|
const quotaItem = document.createElement('div');
|
||||||
|
quotaItem.className = 'user-quota-item card';
|
||||||
|
const percentage = user.storage_quota_gb > 0 ? ((user.storage_used_gb / user.storage_quota_gb) * 100).toFixed(2) : 0;
|
||||||
|
|
||||||
|
quotaItem.innerHTML = `
|
||||||
|
<div class="user-info">
|
||||||
|
<h4>${user.full_name}</h4>
|
||||||
|
<p>${user.email}</p>
|
||||||
|
</div>
|
||||||
|
<div class="quota-details">
|
||||||
|
<div class="storage-gauge small">
|
||||||
|
<div class="storage-gauge-bar" style="width: ${percentage}%;"></div>
|
||||||
|
</div>
|
||||||
|
<p>${user.storage_used_gb} GB / ${user.storage_quota_gb} GB (${percentage}%)</p>
|
||||||
|
</div>
|
||||||
|
<div class="quota-actions">
|
||||||
|
<a href="/users/${user.email}/details" class="btn-outline">View Details</a>
|
||||||
|
<a href="/users/${user.email}/edit" class="btn-outline">Edit Quota</a>
|
||||||
|
<button class="btn-outline delete-user-btn" data-email="${user.email}">Delete User</button>
|
||||||
|
${user.parent_email === null ? `<button class="btn-outline delete-team-btn" data-parent-email="${user.email}">Delete Team</button>` : ''}
|
||||||
|
</div>
|
||||||
|
`;
|
||||||
|
userQuotaList.appendChild(quotaItem);
|
||||||
|
});
|
||||||
|
|
||||||
|
attachButtonListeners();
|
||||||
|
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error fetching users:', error);
|
||||||
|
userQuotaList.innerHTML = '<p>Error loading user quotas.</p>';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function attachButtonListeners() {
|
||||||
|
document.querySelectorAll('.delete-user-btn').forEach(button => {
|
||||||
|
button.addEventListener('click', async (event) => {
|
||||||
|
const email = event.target.dataset.email;
|
||||||
|
if (confirm(`Are you sure you want to delete user ${email}?`)) {
|
||||||
|
try {
|
||||||
|
const response = await fetch(`/api/users/${email}`, {
|
||||||
|
method: 'DELETE',
|
||||||
|
});
|
||||||
|
if (!response.ok) {
|
||||||
|
const errorData = await response.json();
|
||||||
|
throw new Error(errorData.error || `HTTP error! status: ${response.status}`);
|
||||||
|
}
|
||||||
|
alert(`User ${email} deleted successfully.`);
|
||||||
|
fetchAndRenderUsers();
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error deleting user:', error);
|
||||||
|
alert(`Failed to delete user: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
document.querySelectorAll('.delete-team-btn').forEach(button => {
|
||||||
|
button.addEventListener('click', async (event) => {
|
||||||
|
const parentEmail = event.target.dataset.parentEmail;
|
||||||
|
if (confirm(`Are you sure you want to delete the team managed by ${parentEmail}? This will delete all users created by this account.`)) {
|
||||||
|
try {
|
||||||
|
const response = await fetch(`/api/teams/${parentEmail}`, {
|
||||||
|
method: 'DELETE',
|
||||||
|
});
|
||||||
|
if (!response.ok) {
|
||||||
|
const errorData = await response.json();
|
||||||
|
throw new Error(errorData.error || `HTTP error! status: ${response.status}`);
|
||||||
|
}
|
||||||
|
alert(`Team managed by ${parentEmail} and associated users deleted successfully.`);
|
||||||
|
fetchAndRenderUsers();
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error deleting team:', error);
|
||||||
|
alert(`Failed to delete team: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchAndRenderUsers();
|
||||||
|
});
|
||||||
50
retoors/templates/layouts/dashboard.html
Normal file
50
retoors/templates/layouts/dashboard.html
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
{% extends "layouts/base.html" %}
|
||||||
|
|
||||||
|
{% block head %}
|
||||||
|
<link rel="stylesheet" href="/static/css/components/dashboard.css">
|
||||||
|
{% block dashboard_head %}{% endblock %}
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<main class="dashboard-layout">
|
||||||
|
<aside class="dashboard-sidebar">
|
||||||
|
<div class="sidebar-menu">
|
||||||
|
<ul>
|
||||||
|
<li><a href="/files" {% if active_page == 'files' %}class="active"{% endif %}><img src="/static/images/icon-families.svg" alt="My Files Icon" class="icon"> My Files</a></li>
|
||||||
|
<li><a href="/shared" {% if active_page == 'shared' %}class="active"{% endif %}><img src="/static/images/icon-professionals.svg" alt="Shared Icon" class="icon"> Shared with me</a></li>
|
||||||
|
<li><a href="/recent" {% if active_page == 'recent' %}class="active"{% endif %}><img src="/static/images/icon-students.svg" alt="Recent Icon" class="icon"> Recent</a></li>
|
||||||
|
<li><a href="/favorites" {% if active_page == 'favorites' %}class="active"{% endif %}><img src="/static/images/icon-families.svg" alt="Favorites Icon" class="icon"> Favorites</a></li>
|
||||||
|
<li><a href="/trash" {% if active_page == 'trash' %}class="active"{% endif %}><img src="/static/images/icon-professionals.svg" alt="Trash Icon" class="icon"> Trash</a></li>
|
||||||
|
<li><a href="/users" {% if active_page == 'users' %}class="active"{% endif %}><img src="/static/images/icon-students.svg" alt="Users Icon" class="icon"> Users</a></li>
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="sidebar-storage-quota">
|
||||||
|
<h4>Storage Used</h4>
|
||||||
|
<div class="storage-gauge">
|
||||||
|
<div class="storage-gauge-bar" style="width: {{ (user.storage_used_gb / user.storage_quota_gb) * 100 }}%;"></div>
|
||||||
|
</div>
|
||||||
|
<div class="storage-info">
|
||||||
|
<span>{{ user.storage_used_gb }} GB used of {{ user.storage_quota_gb }} GB</span>
|
||||||
|
<span>{{ ((user.storage_used_gb / user.storage_quota_gb) * 100)|round(2) }}%</span>
|
||||||
|
</div>
|
||||||
|
<a href="/order" class="btn-primary" style="margin-top: 20px; display: block; text-align: center;">Manage Quota</a>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="sidebar-help">
|
||||||
|
<a href="/support" class="btn-outline" style="display: block; text-align: center;">Help & Support</a>
|
||||||
|
</div>
|
||||||
|
</aside>
|
||||||
|
|
||||||
|
<section class="dashboard-content">
|
||||||
|
<div class="dashboard-content-header">
|
||||||
|
<h2>{% block page_title %}Dashboard{% endblock %}</h2>
|
||||||
|
<div class="dashboard-actions">
|
||||||
|
{% block dashboard_actions %}{% endblock %}
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{% block dashboard_content %}{% endblock %}
|
||||||
|
</section>
|
||||||
|
</main>
|
||||||
|
{% endblock %}
|
||||||
55
retoors/templates/pages/add_user.html
Normal file
55
retoors/templates/pages/add_user.html
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
{% extends "layouts/dashboard.html" %}
|
||||||
|
|
||||||
|
{% block title %}Add New User - Retoor's Cloud Solutions{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_head %}
|
||||||
|
<link rel="stylesheet" href="/static/css/components/order.css">
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block page_title %}Add New User{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_content %}
|
||||||
|
<div class="order-management-layout">
|
||||||
|
<section class="quota-overview-card">
|
||||||
|
<h2>Create New User Account</h2>
|
||||||
|
<p class="subtitle">Add a new user to your team.</p>
|
||||||
|
|
||||||
|
{% if errors %}
|
||||||
|
<div class="alert alert-error">
|
||||||
|
<ul>
|
||||||
|
{% for field, error in errors.items() %}
|
||||||
|
<li>{{ error }}</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<form action="/users/add" method="post" class="order-form">
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="full_name">Full Name</label>
|
||||||
|
<input type="text" id="full_name" name="full_name" required class="form-input" value="{{ form_data.full_name if form_data else '' }}">
|
||||||
|
</div>
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="email">Email</label>
|
||||||
|
<input type="email" id="email" name="email" required class="form-input" value="{{ form_data.email if form_data else '' }}">
|
||||||
|
</div>
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="password">Password</label>
|
||||||
|
<input type="password" id="password" name="password" required class="form-input">
|
||||||
|
</div>
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="confirm_password">Confirm Password</label>
|
||||||
|
<input type="password" id="confirm_password" name="confirm_password" required class="form-input">
|
||||||
|
</div>
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="storage_quota_gb">Storage Quota (GB)</label>
|
||||||
|
<input type="number" id="storage_quota_gb" name="storage_quota_gb" min="1" value="10" required class="form-input">
|
||||||
|
</div>
|
||||||
|
<div style="display: flex; gap: 10px; margin-top: 20px;">
|
||||||
|
<button type="submit" class="btn-primary">Add User</button>
|
||||||
|
<a href="/users" class="btn-outline">Cancel</a>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
</section>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
63
retoors/templates/pages/edit_user.html
Normal file
63
retoors/templates/pages/edit_user.html
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
{% extends "layouts/dashboard.html" %}
|
||||||
|
|
||||||
|
{% block title %}Edit User - Retoor's Cloud Solutions{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_head %}
|
||||||
|
<link rel="stylesheet" href="/static/css/components/order.css">
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block page_title %}Edit User Quota{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_content %}
|
||||||
|
<div class="order-management-layout">
|
||||||
|
<section class="quota-overview-card">
|
||||||
|
<h2>Edit User Storage Quota</h2>
|
||||||
|
<p class="subtitle">Adjust storage quota for {{ user_data.full_name }} ({{ user_data.email }}).</p>
|
||||||
|
|
||||||
|
{% if errors %}
|
||||||
|
<div class="alert alert-error">
|
||||||
|
<ul>
|
||||||
|
{% for field, error in errors.items() %}
|
||||||
|
<li>{{ error }}</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% if success_message %}
|
||||||
|
<div class="alert alert-success">
|
||||||
|
{{ success_message }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<div class="overview-grid">
|
||||||
|
<div class="total-storage-chart">
|
||||||
|
<h3>Current Usage:</h3>
|
||||||
|
<div class="donut-chart-container">
|
||||||
|
<div class="donut-chart-text">
|
||||||
|
<span class="used-gb">{{ user_data.storage_used_gb }} GB</span> / <span class="total-gb">{{ user_data.storage_quota_gb }} GB</span>
|
||||||
|
</div>
|
||||||
|
<div class="storage-gauge large">
|
||||||
|
<div class="storage-gauge-bar" style="width: {{ (user_data.storage_used_gb / user_data.storage_quota_gb * 100) if user_data.storage_quota_gb > 0 else 0 }}%;"></div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<p class="plan-details">{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="order-form-card">
|
||||||
|
<h3>Update Quota</h3>
|
||||||
|
<form action="/users/{{ user_data.email }}/edit" method="post" class="order-form">
|
||||||
|
<div class="form-group">
|
||||||
|
<label for="storage_quota_gb">Storage Quota (GB)</label>
|
||||||
|
<input type="number" id="storage_quota_gb" name="storage_quota_gb" min="1" value="{{ user_data.storage_quota_gb }}" required class="form-input">
|
||||||
|
</div>
|
||||||
|
<div style="display: flex; gap: 10px; margin-top: 20px;">
|
||||||
|
<button type="submit" class="btn-primary">Update Quota</button>
|
||||||
|
<a href="/users" class="btn-outline">Cancel</a>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
55
retoors/templates/pages/user_details.html
Normal file
55
retoors/templates/pages/user_details.html
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
{% extends "layouts/dashboard.html" %}
|
||||||
|
|
||||||
|
{% block title %}User Details - Retoor's Cloud Solutions{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_head %}
|
||||||
|
<link rel="stylesheet" href="/static/css/components/order.css">
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block page_title %}User Details{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_actions %}
|
||||||
|
<a href="/users/{{ user_data.email }}/edit" class="btn-primary">Edit Quota</a>
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_content %}
|
||||||
|
<div class="order-management-layout">
|
||||||
|
<section class="quota-overview-card">
|
||||||
|
<h2>{{ user_data.full_name }}</h2>
|
||||||
|
<p class="subtitle">{{ user_data.email }}</p>
|
||||||
|
|
||||||
|
<div class="overview-grid">
|
||||||
|
<div class="total-storage-chart">
|
||||||
|
<h3>Storage Usage:</h3>
|
||||||
|
<div class="donut-chart-container">
|
||||||
|
<div class="donut-chart-text">
|
||||||
|
<span class="used-gb">{{ user_data.storage_used_gb }} GB</span> / <span class="total-gb">{{ user_data.storage_quota_gb }} GB</span>
|
||||||
|
</div>
|
||||||
|
<div class="storage-gauge large">
|
||||||
|
<div class="storage-gauge-bar" style="width: {{ (user_data.storage_used_gb / user_data.storage_quota_gb * 100) if user_data.storage_quota_gb > 0 else 0 }}%;"></div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
<p class="plan-details">{{ ((user_data.storage_used_gb / user_data.storage_quota_gb) * 100)|round(2) if user_data.storage_quota_gb > 0 else 0 }}% used</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="order-form-card">
|
||||||
|
<h3>Account Information</h3>
|
||||||
|
<div style="margin-top: 20px;">
|
||||||
|
<p><strong>Full Name:</strong> {{ user_data.full_name }}</p>
|
||||||
|
<p><strong>Email:</strong> {{ user_data.email }}</p>
|
||||||
|
<p><strong>Storage Used:</strong> {{ user_data.storage_used_gb }} GB</p>
|
||||||
|
<p><strong>Storage Quota:</strong> {{ user_data.storage_quota_gb }} GB</p>
|
||||||
|
<p><strong>Parent Email:</strong> {{ user_data.parent_email if user_data.parent_email else 'N/A' }}</p>
|
||||||
|
</div>
|
||||||
|
<div style="display: flex; gap: 10px; margin-top: 30px;">
|
||||||
|
<a href="/users/{{ user_data.email }}/edit" class="btn-primary">Edit Quota</a>
|
||||||
|
<form action="/users/{{ user_data.email }}/delete" method="post" style="display: inline;" onsubmit="return confirm('Are you sure you want to delete this user?');">
|
||||||
|
<button type="submit" class="btn-danger">Delete User</button>
|
||||||
|
</form>
|
||||||
|
<a href="/users" class="btn-outline">Back to Users</a>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
34
retoors/templates/pages/users.html
Normal file
34
retoors/templates/pages/users.html
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
{% extends "layouts/dashboard.html" %}
|
||||||
|
|
||||||
|
{% block title %}User Management - Retoor's Cloud Solutions{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_head %}
|
||||||
|
<link rel="stylesheet" href="/static/css/components/order.css">
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block page_title %}Users{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_actions %}
|
||||||
|
<a href="/users/add" class="btn-primary">+ Add New User</a>
|
||||||
|
{% endblock %}
|
||||||
|
|
||||||
|
{% block dashboard_content %}
|
||||||
|
{% if success_message %}
|
||||||
|
<div class="alert alert-success">
|
||||||
|
{{ success_message }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
{% if error_message %}
|
||||||
|
<div class="alert alert-error">
|
||||||
|
{{ error_message }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<div class="user-quotas-section">
|
||||||
|
<div class="user-quota-list" id="user-quota-list">
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script src="/static/js/components/users.js"></script>
|
||||||
|
{% endblock %}
|
||||||
482
tests/test_file_browser.py
Normal file
482
tests/test_file_browser.py
Normal file
@ -0,0 +1,482 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from pathlib import Path
|
||||||
|
import json
|
||||||
|
import datetime
|
||||||
|
import aiohttp
|
||||||
|
from aiohttp.test_utils import TestClient
|
||||||
|
from aiohttp_session import setup as setup_session
|
||||||
|
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
||||||
|
from retoors.helpers.env_manager import get_or_create_session_secret_key
|
||||||
|
|
||||||
|
# Assuming the FileService is in retoors/services/file_service.py
|
||||||
|
# and the FileBrowserView is in retoors/views/site.py
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_user_files_dir(tmp_path):
|
||||||
|
"""Fixture to create a temporary directory for user files."""
|
||||||
|
user_files_dir = tmp_path / "user_files"
|
||||||
|
user_files_dir.mkdir()
|
||||||
|
return user_files_dir
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def temp_users_json(tmp_path):
|
||||||
|
"""Fixture to create a temporary users.json file."""
|
||||||
|
users_json_path = tmp_path / "users.json"
|
||||||
|
initial_users_data = {
|
||||||
|
"test@example.com": {
|
||||||
|
"full_name": "Test User",
|
||||||
|
"email": "test@example.com",
|
||||||
|
"password": "hashed_password",
|
||||||
|
"storage_quota_gb": 10,
|
||||||
|
"storage_used_gb": 0,
|
||||||
|
"parent_email": None,
|
||||||
|
"shared_items": {}
|
||||||
|
},
|
||||||
|
"child@example.com": {
|
||||||
|
"full_name": "Child User",
|
||||||
|
"email": "child@example.com",
|
||||||
|
"password": "hashed_password",
|
||||||
|
"storage_quota_gb": 5,
|
||||||
|
"storage_used_gb": 0,
|
||||||
|
"parent_email": "test@example.com",
|
||||||
|
"shared_items": {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
with open(users_json_path, "w") as f:
|
||||||
|
json.dump(initial_users_data, f)
|
||||||
|
return users_json_path
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def file_service_instance(temp_user_files_dir, temp_users_json):
|
||||||
|
"""Fixture to provide a FileService instance with temporary directories."""
|
||||||
|
from retoors.services.file_service import FileService
|
||||||
|
return FileService(temp_user_files_dir, temp_users_json)
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def logged_in_client(aiohttp_client, create_test_app, mocker):
|
||||||
|
"""Fixture to provide an aiohttp client with a logged-in user."""
|
||||||
|
app = create_test_app
|
||||||
|
client = await aiohttp_client(app)
|
||||||
|
|
||||||
|
user_service = app["user_service"]
|
||||||
|
|
||||||
|
def mock_create_user(full_name, email, password, parent_email=None):
|
||||||
|
return {
|
||||||
|
"full_name": full_name,
|
||||||
|
"email": email,
|
||||||
|
"password": "hashed_password",
|
||||||
|
"storage_quota_gb": 10,
|
||||||
|
"storage_used_gb": 0,
|
||||||
|
"parent_email": parent_email,
|
||||||
|
"shared_items": {}
|
||||||
|
}
|
||||||
|
|
||||||
|
def mock_authenticate_user(email, password):
|
||||||
|
return {
|
||||||
|
"email": email,
|
||||||
|
"full_name": "Test User",
|
||||||
|
"is_admin": False,
|
||||||
|
"storage_quota_gb": 10,
|
||||||
|
"storage_used_gb": 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def mock_get_user_by_email(email):
|
||||||
|
return {
|
||||||
|
"email": email,
|
||||||
|
"full_name": "Test User",
|
||||||
|
"is_admin": False,
|
||||||
|
"storage_quota_gb": 10,
|
||||||
|
"storage_used_gb": 0
|
||||||
|
}
|
||||||
|
|
||||||
|
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||||
|
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||||
|
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||||
|
|
||||||
|
await client.post(
|
||||||
|
"/register",
|
||||||
|
data={
|
||||||
|
"full_name": "Test User",
|
||||||
|
"email": "test@example.com",
|
||||||
|
"password": "password",
|
||||||
|
"confirm_password": "password",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await client.post(
|
||||||
|
"/login", data={"email": "test@example.com", "password": "password"}
|
||||||
|
)
|
||||||
|
|
||||||
|
return client
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
|
||||||
|
"""Fixture to provide an aiohttp client with a logged-in admin user."""
|
||||||
|
app = create_test_app
|
||||||
|
client = await aiohttp_client(app)
|
||||||
|
|
||||||
|
user_service = app["user_service"]
|
||||||
|
|
||||||
|
def mock_create_user(full_name, email, password, parent_email=None):
|
||||||
|
return {
|
||||||
|
"full_name": full_name,
|
||||||
|
"email": email,
|
||||||
|
"password": "hashed_password",
|
||||||
|
"storage_quota_gb": 100,
|
||||||
|
"storage_used_gb": 0,
|
||||||
|
"parent_email": parent_email,
|
||||||
|
"shared_items": {}
|
||||||
|
}
|
||||||
|
|
||||||
|
def mock_authenticate_user(email, password):
|
||||||
|
return {
|
||||||
|
"email": email,
|
||||||
|
"full_name": "Admin User",
|
||||||
|
"is_admin": True,
|
||||||
|
"storage_quota_gb": 100,
|
||||||
|
"storage_used_gb": 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def mock_get_user_by_email(email):
|
||||||
|
return {
|
||||||
|
"email": email,
|
||||||
|
"full_name": "Admin User",
|
||||||
|
"is_admin": True,
|
||||||
|
"storage_quota_gb": 100,
|
||||||
|
"storage_used_gb": 0
|
||||||
|
}
|
||||||
|
|
||||||
|
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||||
|
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||||
|
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||||
|
|
||||||
|
await client.post(
|
||||||
|
"/register",
|
||||||
|
data={
|
||||||
|
"full_name": "Admin User",
|
||||||
|
"email": "admin@example.com",
|
||||||
|
"password": "password",
|
||||||
|
"confirm_password": "password",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await client.post(
|
||||||
|
"/login", data={"email": "admin@example.com", "password": "password"}
|
||||||
|
)
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
# --- FileService Tests ---
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_list_files_empty(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
files = await file_service_instance.list_files(user_email)
|
||||||
|
assert files == []
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_create_folder(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
folder_name = "my_new_folder"
|
||||||
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
||||||
|
assert success
|
||||||
|
expected_path = temp_user_files_dir / user_email / folder_name
|
||||||
|
assert expected_path.is_dir()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_create_folder_exists(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
folder_name = "existing_folder"
|
||||||
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
||||||
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
||||||
|
success = await file_service_instance.create_folder(user_email, folder_name)
|
||||||
|
assert not success # Should return False if folder already exists
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_upload_file(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "document.txt"
|
||||||
|
file_content = b"Hello, world!"
|
||||||
|
success = await file_service_instance.upload_file(user_email, file_name, file_content)
|
||||||
|
assert success
|
||||||
|
expected_path = temp_user_files_dir / user_email / file_name
|
||||||
|
assert expected_path.is_file()
|
||||||
|
assert expected_path.read_bytes() == file_content
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_list_files_with_content(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
await file_service_instance.create_folder(user_email, "folder1")
|
||||||
|
await file_service_instance.upload_file(user_email, "file1.txt", b"content1")
|
||||||
|
await file_service_instance.upload_file(user_email, "folder1/file2.txt", b"content2")
|
||||||
|
|
||||||
|
files = await file_service_instance.list_files(user_email)
|
||||||
|
assert len(files) == 2
|
||||||
|
assert any(f["name"] == "folder1" and f["is_dir"] for f in files)
|
||||||
|
assert any(f["name"] == "file1.txt" and not f["is_dir"] for f in files)
|
||||||
|
|
||||||
|
files_in_folder1 = await file_service_instance.list_files(user_email, "folder1")
|
||||||
|
assert len(files_in_folder1) == 1
|
||||||
|
assert files_in_folder1[0]["name"] == "file2.txt"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_download_file(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "download.txt"
|
||||||
|
file_content = b"Downloadable content."
|
||||||
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
||||||
|
|
||||||
|
content, name = await file_service_instance.download_file(user_email, file_name)
|
||||||
|
assert content == file_content
|
||||||
|
assert name == file_name
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_download_file_not_found(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
content = await file_service_instance.download_file(user_email, "nonexistent.txt")
|
||||||
|
assert content is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_delete_file(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "to_delete.txt"
|
||||||
|
(temp_user_files_dir / user_email).mkdir(exist_ok=True)
|
||||||
|
(temp_user_files_dir / user_email / file_name).write_bytes(b"delete me")
|
||||||
|
|
||||||
|
success = await file_service_instance.delete_item(user_email, file_name)
|
||||||
|
assert success
|
||||||
|
assert not (temp_user_files_dir / user_email / file_name).exists()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_delete_folder(file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
folder_name = "folder_to_delete"
|
||||||
|
(temp_user_files_dir / user_email).mkdir(parents=True)
|
||||||
|
(temp_user_files_dir / user_email / folder_name).mkdir(parents=True)
|
||||||
|
(temp_user_files_dir / user_email / folder_name / "nested.txt").write_bytes(b"nested")
|
||||||
|
|
||||||
|
success = await file_service_instance.delete_item(user_email, folder_name)
|
||||||
|
assert success
|
||||||
|
assert not (temp_user_files_dir / user_email / folder_name).exists()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_delete_nonexistent(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
success = await file_service_instance.delete_item(user_email, "nonexistent_item")
|
||||||
|
assert not success
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_generate_share_link(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_path = "shareable.txt"
|
||||||
|
await file_service_instance.upload_file(user_email, file_path, b"share content")
|
||||||
|
|
||||||
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
||||||
|
assert share_id is not None
|
||||||
|
assert isinstance(share_id, str)
|
||||||
|
|
||||||
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
||||||
|
assert shared_item is not None
|
||||||
|
assert shared_item["user_email"] == user_email
|
||||||
|
assert shared_item["item_path"] == file_path
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_get_shared_file_content(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_path = "shared_file.txt"
|
||||||
|
content_to_share = b"This is shared content."
|
||||||
|
await file_service_instance.upload_file(user_email, file_path, content_to_share)
|
||||||
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
||||||
|
|
||||||
|
retrieved_content, filename = await file_service_instance.get_shared_file_content(share_id)
|
||||||
|
assert retrieved_content == content_to_share
|
||||||
|
assert filename == "shared_file.txt"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_get_shared_folder_content(file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
folder_path = "shared_folder"
|
||||||
|
await file_service_instance.create_folder(user_email, folder_path)
|
||||||
|
await file_service_instance.upload_file(user_email, f"{folder_path}/nested.txt", b"nested content")
|
||||||
|
share_id = await file_service_instance.generate_share_link(user_email, folder_path)
|
||||||
|
|
||||||
|
retrieved_content = await file_service_instance.get_shared_folder_content(share_id)
|
||||||
|
assert len(retrieved_content) == 1
|
||||||
|
assert retrieved_content[0]["name"] == "nested.txt"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_service_shared_link_expiry(file_service_instance, mocker):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_path = "expiring_file.txt"
|
||||||
|
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
|
||||||
|
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
||||||
|
|
||||||
|
# Mock datetime to simulate an expired link
|
||||||
|
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
|
||||||
|
mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime))
|
||||||
|
datetime.datetime.now.return_value = future_time
|
||||||
|
|
||||||
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
||||||
|
assert shared_item is None
|
||||||
|
|
||||||
|
|
||||||
|
# --- FileBrowserView Tests ---
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_get_unauthorized(client: TestClient):
|
||||||
|
resp = await client.get("/files", allow_redirects=False)
|
||||||
|
assert resp.status == 302 # Redirect to login
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_get_authorized_empty(logged_in_client: TestClient):
|
||||||
|
resp = await logged_in_client.get("/files")
|
||||||
|
assert resp.status == 200
|
||||||
|
text = await resp.text()
|
||||||
|
assert "No files found in this directory." in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_get_authorized_with_files(logged_in_client: TestClient, file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
await file_service_instance.create_folder(user_email, "my_folder")
|
||||||
|
await file_service_instance.upload_file(user_email, "my_file.txt", b"some content")
|
||||||
|
|
||||||
|
resp = await logged_in_client.get("/files")
|
||||||
|
assert resp.status == 200
|
||||||
|
text = await resp.text()
|
||||||
|
assert "my_folder" in text
|
||||||
|
assert "my_file.txt" in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_new_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": "new_folder_via_web"}, allow_redirects=False)
|
||||||
|
assert resp.status == 302 # Redirect
|
||||||
|
assert resp.headers["Location"].startswith("/files")
|
||||||
|
|
||||||
|
expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
|
||||||
|
assert expected_path.is_dir()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_content = b"Uploaded content from web."
|
||||||
|
|
||||||
|
from io import BytesIO
|
||||||
|
data = aiohttp.FormData()
|
||||||
|
data.add_field('file',
|
||||||
|
BytesIO(file_content),
|
||||||
|
filename='uploaded.txt',
|
||||||
|
content_type='text/plain')
|
||||||
|
|
||||||
|
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
|
||||||
|
assert resp.status == 302 # Redirect
|
||||||
|
assert resp.headers["Location"].startswith("/files")
|
||||||
|
|
||||||
|
expected_path = temp_user_files_dir / user_email / "uploaded.txt"
|
||||||
|
assert expected_path.is_file()
|
||||||
|
assert expected_path.read_bytes() == file_content
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_download_file(logged_in_client: TestClient, file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "web_download.txt"
|
||||||
|
file_content = b"Content to be downloaded via web."
|
||||||
|
await file_service_instance.upload_file(user_email, file_name, file_content)
|
||||||
|
|
||||||
|
resp = await logged_in_client.get(f"/files/download/{file_name}")
|
||||||
|
assert resp.status == 200
|
||||||
|
assert resp.headers["Content-Disposition"] == f"attachment; filename={file_name}"
|
||||||
|
assert await resp.read() == file_content
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
|
||||||
|
resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
|
||||||
|
assert resp.status == 404
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "web_delete.txt"
|
||||||
|
await file_service_instance.upload_file(user_email, file_name, b"delete this")
|
||||||
|
|
||||||
|
expected_path = temp_user_files_dir / user_email / file_name
|
||||||
|
assert expected_path.is_file()
|
||||||
|
|
||||||
|
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
|
||||||
|
assert resp.status == 302 # Redirect
|
||||||
|
assert resp.headers["Location"].startswith("/files")
|
||||||
|
assert not expected_path.is_file()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_delete_folder(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
folder_name = "web_delete_folder"
|
||||||
|
await file_service_instance.create_folder(user_email, folder_name)
|
||||||
|
await file_service_instance.upload_file(user_email, f"{folder_name}/nested.txt", b"nested")
|
||||||
|
|
||||||
|
expected_path = temp_user_files_dir / user_email / folder_name
|
||||||
|
assert expected_path.is_dir()
|
||||||
|
|
||||||
|
resp = await logged_in_client.post(f"/files/delete/{folder_name}", allow_redirects=False)
|
||||||
|
assert resp.status == 302 # Redirect
|
||||||
|
assert resp.headers["Location"].startswith("/files")
|
||||||
|
assert not expected_path.is_dir()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance):
|
||||||
|
user_email = "test@example.com"
|
||||||
|
file_name = "web_share.txt"
|
||||||
|
await file_service_instance.upload_file(user_email, file_name, b"shareable content")
|
||||||
|
|
||||||
|
resp = await logged_in_client.post(f"/files/share/{file_name}")
|
||||||
|
assert resp.status == 200
|
||||||
|
data = await resp.json()
|
||||||
|
assert "share_link" in data
|
||||||
|
assert "http" in data["share_link"] # Check if it's a URL
|
||||||
|
|
||||||
|
# Verify the shared item can be retrieved
|
||||||
|
# The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID
|
||||||
|
share_id = data["share_link"].split("/")[-1]
|
||||||
|
shared_item = await file_service_instance.get_shared_item(share_id)
|
||||||
|
assert shared_item is not None
|
||||||
|
assert shared_item["item_path"] == file_name
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
|
||||||
|
"""Fixture to create a test aiohttp application with mocked services."""
|
||||||
|
from aiohttp import web
|
||||||
|
from retoors.middlewares import user_middleware, error_middleware
|
||||||
|
from retoors.services.user_service import UserService
|
||||||
|
from retoors.routes import setup_routes
|
||||||
|
import aiohttp_jinja2
|
||||||
|
import jinja2
|
||||||
|
|
||||||
|
app = web.Application()
|
||||||
|
|
||||||
|
# Setup session for the test app
|
||||||
|
project_root = Path(__file__).parent.parent
|
||||||
|
env_file_path = project_root / ".env"
|
||||||
|
secret_key = get_or_create_session_secret_key(env_file_path)
|
||||||
|
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
|
||||||
|
|
||||||
|
app.middlewares.append(error_middleware)
|
||||||
|
app.middlewares.append(user_middleware)
|
||||||
|
|
||||||
|
# Mock UserService
|
||||||
|
mock_user_service = mocker.MagicMock(spec=UserService)
|
||||||
|
|
||||||
|
# Mock scheduler
|
||||||
|
mock_scheduler = mocker.MagicMock()
|
||||||
|
mock_scheduler.spawn = mocker.AsyncMock()
|
||||||
|
mock_scheduler.close = mocker.AsyncMock()
|
||||||
|
|
||||||
|
app["user_service"] = mock_user_service
|
||||||
|
app["file_service"] = file_service_instance
|
||||||
|
app["scheduler"] = mock_scheduler
|
||||||
|
|
||||||
|
# Setup Jinja2 for templates
|
||||||
|
base_path = Path(__file__).parent.parent / "retoors"
|
||||||
|
templates_path = base_path / "templates"
|
||||||
|
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
|
||||||
|
|
||||||
|
setup_routes(app)
|
||||||
|
return app
|
||||||
Loading…
Reference in New Issue
Block a user