Compare commits

...

10 Commits

Author SHA1 Message Date
f54941dd80 Broken. 2025-11-09 02:34:06 +01:00
059457deae Update. 2025-11-09 02:14:26 +01:00
5d3d0b162d Update. 2025-11-09 01:54:31 +01:00
ea8af383cc Update. 2025-11-09 01:49:11 +01:00
6e47d43a03 Update. 2025-11-09 01:35:03 +01:00
88d57c3837 Update. 2025-11-09 01:34:53 +01:00
9e9907bc00 Update. 2025-11-09 00:52:26 +01:00
c6fb77c89d Update. 2025-11-09 00:52:16 +01:00
81f1cfd200 Update. 2025-11-09 00:33:57 +01:00
e228a2e59c Update. 2025-11-09 00:27:13 +01:00
18 changed files with 1439 additions and 474 deletions

View File

@ -9,3 +9,4 @@ pytest
pytest-aiohttp
aiohttp-test-utils
pytest-mock
pillow

View File

@ -6,13 +6,17 @@ from ..services.user_service import UserService # Import UserService
def login_required(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
session = await get_session(self.request)
if not getattr(self, 'request', None):
request = self
else:
request = self.request
session = await get_session(request)
user_email = session.get('user_email')
if not user_email:
raise web.HTTPFound('/login')
user_service: UserService = self.request.app["user_service"]
user_service: UserService = request.app["user_service"]
user = user_service.get_user_by_email(user_email)
if not user:
@ -21,6 +25,6 @@ def login_required(func):
raise web.HTTPFound('/login')
# Ensure the user object is available in the request for views
self.request["user"] = user
request["user"] = user
return await func(self, *args, **kwargs)
return wrapper

View File

@ -11,7 +11,7 @@ from .routes import setup_routes
from .services.user_service import UserService
from .services.config_service import ConfigService
from .services.file_service import FileService # Import FileService
from .middlewares import user_middleware, error_middleware
from .middlewares import user_middleware, error_middleware,request_hybrid_middleware
from .helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key # Import new function
@ -40,6 +40,7 @@ def create_app():
# The order of middleware registration matters.
# They are executed in the order they are added.
app.middlewares.append(request_hybrid_middleware)
app.middlewares.append(error_middleware)
# Setup session

View File

@ -1,6 +1,10 @@
from aiohttp import web
from aiohttp_session import get_session
@web.middleware
async def request_hybrid_middleware(request, handler):
setattr(request,'request', request)
return await handler(request)
@web.middleware
async def user_middleware(request, handler):
@ -20,3 +24,4 @@ async def error_middleware(request, handler):
raise # Re-raise HTTPException to see original traceback
except Exception:
raise # Re-raise generic Exception to see original traceback

View File

@ -1,5 +1,6 @@
from .views.auth import LoginView, RegistrationView, LogoutView, ForgotPasswordView, ResetPasswordView
from .views.site import SiteView, OrderView, FileBrowserView, UserManagementView
from .views.upload import UploadView
from .views.admin import get_users, add_user, update_user_quota, delete_user, get_user_details, delete_team
@ -30,10 +31,14 @@ def setup_routes(app):
app.router.add_post("/users/{email}/delete", UserManagementView, name="delete_user_page")
app.router.add_view("/files", FileBrowserView, name="file_browser")
app.router.add_post("/files/new_folder", FileBrowserView, name="new_folder")
app.router.add_post("/files/upload", FileBrowserView, name="upload_file")
app.router.add_get("/files/download/{file_path:.*}", FileBrowserView, name="download_file")
app.router.add_post("/files/upload", UploadView, name="upload_file")
app.router.add_get("/files/download/{file_path:.*}", FileBrowserView.get_download_file, name="download_file")
app.router.add_post("/files/share/{file_path:.*}", FileBrowserView, name="share_file")
app.router.add_post("/files/delete/{file_path:.*}", FileBrowserView, name="delete_item")
app.router.add_post("/files/delete_multiple", FileBrowserView, name="delete_multiple_items")
app.router.add_post("/files/share_multiple", FileBrowserView, name="share_multiple_items")
app.router.add_get("/shared_file/{share_id}", FileBrowserView.shared_file_handler, name="shared_file")
app.router.add_get("/shared_file/{share_id}/download", FileBrowserView.download_shared_file_handler, name="download_shared_file")
# Admin API routes for user and team management
app.router.add_get("/api/users", get_users, name="api_get_users")

View File

@ -12,8 +12,11 @@ class ConfigService:
def _load_config(self):
config_from_file = {}
if self._config_path.exists():
try:
with open(self._config_path, "r") as f:
config_from_file = json.load(f)
except json.JSONDecodeError:
config_from_file = {}
# Override with environment variables
config_from_env = {

View File

@ -4,35 +4,53 @@ from pathlib import Path
import shutil
import uuid
import datetime
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class FileService:
def __init__(self, base_dir: Path, users_data_path: Path):
self.base_dir = base_dir
self.users_data_path = users_data_path
self.base_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"FileService initialized with base_dir: {self.base_dir} and users_data_path: {self.users_data_path}")
async def _load_users_data(self):
"""Loads user data from the JSON file."""
if not self.users_data_path.exists():
return {}
logger.warning(f"users_data_path does not exist: {self.users_data_path}")
return []
async with aiofiles.open(self.users_data_path, mode="r") as f:
content = await f.read()
return json.loads(content) if content else {}
try:
return json.loads(content) if content else []
except json.JSONDecodeError:
logger.error(f"JSONDecodeError when loading users data from {self.users_data_path}")
return []
async def _save_users_data(self, data):
"""Saves user data to the JSON file."""
async with aiofiles.open(self.users_data_path, mode="w") as f:
await f.write(json.dumps(data, indent=4))
logger.debug(f"Saved users data to {self.users_data_path}")
def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
"""Constructs the absolute path for a user's file or directory."""
user_dir = self.base_dir / user_email
return user_dir / relative_path
full_path = user_dir / relative_path
logger.debug(f"Constructed path for user '{user_email}', relative_path '{relative_path}': {full_path}")
return full_path
async def list_files(self, user_email: str, path: str = "") -> list:
"""Lists files and directories for a given user within a specified path."""
user_path = self._get_user_file_path(user_email, path)
if not user_path.is_dir():
logger.warning(f"list_files: User path is not a directory or does not exist: {user_path}")
return []
files_list = []
@ -47,14 +65,17 @@ class FileService:
"last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
}
files_list.append(file_info)
logger.debug(f"Listed {len(files_list)} items for user '{user_email}' in path '{path}'")
return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
async def create_folder(self, user_email: str, folder_path: str) -> bool:
"""Creates a new folder for the user."""
full_path = self._get_user_file_path(user_email, folder_path)
if full_path.exists():
logger.warning(f"create_folder: Folder already exists: {full_path}")
return False # Folder already exists
full_path.mkdir(parents=True, exist_ok=True)
logger.info(f"create_folder: Folder created: {full_path}")
return True
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
@ -63,38 +84,49 @@ class FileService:
full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
async with aiofiles.open(full_path, mode="wb") as f:
await f.write(content)
logger.info(f"upload_file: File uploaded to: {full_path}")
return True
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
"""Downloads a file for the user."""
full_path = self._get_user_file_path(user_email, file_path)
logger.debug(f"download_file: Attempting to download file from: {full_path}")
if full_path.is_file():
async with aiofiles.open(full_path, mode="rb") as f:
content = await f.read()
logger.info(f"download_file: Successfully read file: {full_path}")
return content, full_path.name
logger.warning(f"download_file: File not found or is not a file: {full_path}")
return None
async def delete_item(self, user_email: str, item_path: str) -> bool:
"""Deletes a file or folder for the user."""
full_path = self._get_user_file_path(user_email, item_path)
logger.debug(f"delete_item: Attempting to delete item: {full_path}")
if not full_path.exists():
logger.warning(f"delete_item: Item does not exist: {full_path}")
return False
if full_path.is_file():
full_path.unlink()
logger.info(f"delete_item: File deleted: {full_path}")
elif full_path.is_dir():
shutil.rmtree(full_path)
logger.info(f"delete_item: Directory deleted: {full_path}")
return True
async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
"""Generates a shareable link for a file or folder."""
logger.debug(f"generate_share_link: Generating link for user '{user_email}', item '{item_path}'")
users_data = await self._load_users_data()
user = users_data.get(user_email)
user = next((u for u in users_data if u.get("email") == user_email), None)
if not user:
logger.warning(f"generate_share_link: User not found: {user_email}")
return None
full_path = self._get_user_file_path(user_email, item_path)
if not full_path.exists():
logger.warning(f"generate_share_link: Item does not exist: {full_path}")
return None
share_id = str(uuid.uuid4())
@ -107,37 +139,60 @@ class FileService:
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
}
await self._save_users_data(users_data)
logger.info(f"generate_share_link: Share link generated with ID: {share_id} for item: {item_path}")
return share_id
async def get_shared_item(self, share_id: str) -> dict | None:
"""Retrieves information about a shared item."""
logger.debug(f"get_shared_item: Retrieving shared item with ID: {share_id}")
users_data = await self._load_users_data()
for user_email, user_info in users_data.items():
for user_info in users_data:
if "shared_items" in user_info and share_id in user_info["shared_items"]:
shared_item = user_info["shared_items"][share_id]
expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
logger.info(f"get_shared_item: Found valid shared item for ID: {share_id}")
return shared_item
else:
logger.warning(f"get_shared_item: Shared item {share_id} has expired.")
logger.warning(f"get_shared_item: No valid shared item found for ID: {share_id}")
return None
async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None:
async def get_shared_file_content(self, share_id: str, requested_file_path: str | None = None) -> tuple[bytes, str] | None:
"""Retrieves the content of a shared file."""
logger.debug(f"get_shared_file_content: Retrieving content for shared file with ID: {share_id}, requested_file_path: {requested_file_path}")
shared_item = await self.get_shared_item(share_id)
if not shared_item:
return None
user_email = shared_item["user_email"]
item_path = shared_item["item_path"]
full_path = self._get_user_file_path(user_email, item_path)
item_path = shared_item["item_path"] # This is the path of the originally shared item (file or folder)
if full_path.is_file():
async with aiofiles.open(full_path, mode="rb") as f:
# Construct the full path to the originally shared item
full_shared_item_path = self._get_user_file_path(user_email, item_path)
target_file_path = full_shared_item_path
if requested_file_path:
# If a specific file within a shared folder is requested
target_file_path = self._get_user_file_path(user_email, requested_file_path)
# Security check: Ensure the requested file is actually within the shared item's directory
try:
target_file_path.relative_to(full_shared_item_path)
except ValueError:
logger.warning(f"get_shared_file_content: Requested file path '{requested_file_path}' is not within shared item path '{item_path}' for share_id: {share_id}")
return None
if target_file_path.is_file():
async with aiofiles.open(target_file_path, mode="rb") as f:
content = await f.read()
return content, full_path.name
logger.info(f"get_shared_file_content: Successfully read content for shared file: {target_file_path}")
return content, target_file_path.name
logger.warning(f"get_shared_file_content: Shared item path is not a file or does not exist: {target_file_path}")
return None
async def get_shared_folder_content(self, share_id: str) -> list | None:
"""Retrieves the content of a shared folder."""
logger.debug(f"get_shared_folder_content: Retrieving content for shared folder with ID: {share_id}")
shared_item = await self.get_shared_item(share_id)
if not shared_item:
return None
@ -147,5 +202,7 @@ class FileService:
full_path = self._get_user_file_path(user_email, item_path)
if full_path.is_dir():
logger.info(f"get_shared_folder_content: Listing files for shared folder: {full_path}")
return await self.list_files(user_email, item_path)
logger.warning(f"get_shared_folder_content: Shared item path is not a directory or does not exist: {full_path}")
return None

View File

@ -14,8 +14,11 @@ class UserService:
def _load_users(self) -> List[Dict[str, Any]]:
if not self._users_path.exists():
return []
try:
with open(self._users_path, "r") as f:
return json.load(f)
except json.JSONDecodeError:
return []
def _save_users(self):
with open(self._users_path, "w") as f:

View File

@ -65,8 +65,8 @@
}
.btn-small {
padding: 0.4rem 0.8rem;
font-size: 0.85rem;
padding: 0.2rem 0.4rem; /* Reduced padding */
font-size: 0.75rem; /* Reduced font size */
border: 1px solid var(--border-color);
border-radius: 4px;
background-color: var(--background-color);
@ -236,3 +236,107 @@
min-width: 120px;
}
}
/* Styles for the new upload functionality */
.upload-area {
border: 2px dashed var(--border-color);
border-radius: 8px;
padding: 20px;
text-align: center;
margin-bottom: 20px;
background-color: var(--background-color);
}
.upload-button {
display: inline-block;
padding: 10px 20px;
cursor: pointer;
margin-top: 10px;
}
.selected-files-preview {
margin-top: 20px;
text-align: left;
max-height: 200px;
overflow-y: auto;
border-top: 1px solid var(--border-color);
padding-top: 10px;
}
.file-entry {
display: flex;
align-items: center;
padding: 8px 0;
border-bottom: 1px solid var(--border-color);
}
.file-entry:last-child {
border-bottom: none;
}
.file-entry .file-name {
flex-grow: 1;
margin-right: 10px;
color: var(--text-color);
}
.file-entry .file-size {
color: var(--light-text-color);
font-size: 0.85rem;
}
.file-entry .thumbnail-preview {
width: 40px;
height: 40px;
margin-left: 10px;
border-radius: 4px;
overflow: hidden;
display: flex;
align-items: center;
justify-content: center;
background-color: var(--background-color);
border: 1px solid var(--border-color);
}
.file-entry .thumbnail-preview img {
max-width: 100%;
max-height: 100%;
display: block;
object-fit: contain;
}
.progress-bar-container {
margin-top: 15px;
text-align: left;
border-top: 1px solid var(--border-color);
padding-top: 10px;
}
.progress-bar-container .file-name {
font-weight: bold;
margin-bottom: 5px;
color: var(--text-color);
}
.progress-bar-wrapper {
width: 100%;
background-color: var(--background-color);
border-radius: 5px;
overflow: hidden;
height: 10px;
margin-bottom: 5px;
}
.progress-bar {
height: 100%;
width: 0%;
background-color: var(--accent-color);
border-radius: 5px;
transition: width 0.3s ease-in-out;
}
.progress-text {
font-size: 0.85rem;
color: var(--light-text-color);
text-align: right;
}

View File

@ -0,0 +1,116 @@
export function showUploadModal() {
document.getElementById('upload-modal').style.display = 'block';
// Clear previous selections and progress
document.getElementById('selected-files-preview').innerHTML = '';
document.getElementById('upload-progress-container').innerHTML = '';
document.getElementById('file-input-multiple').value = ''; // Clear selected files
document.getElementById('start-upload-btn').disabled = true;
}
document.addEventListener('DOMContentLoaded', () => {
const fileInput = document.getElementById('file-input-multiple');
const selectedFilesPreview = document.getElementById('selected-files-preview');
const startUploadBtn = document.getElementById('start-upload-btn');
const uploadProgressContainer = document.getElementById('upload-progress-container');
let filesToUpload = [];
fileInput.addEventListener('change', (event) => {
filesToUpload = Array.from(event.target.files);
selectedFilesPreview.innerHTML = ''; // Clear previous previews
uploadProgressContainer.innerHTML = ''; // Clear previous progress bars
if (filesToUpload.length > 0) {
startUploadBtn.disabled = false;
filesToUpload.forEach(file => {
const fileEntry = document.createElement('div');
fileEntry.className = 'file-entry';
fileEntry.innerHTML = `
<span class="file-name">${file.name}</span>
<span class="file-size">(${(file.size / 1024 / 1024).toFixed(2)} MB)</span>
<div class="thumbnail-preview"></div>
`;
selectedFilesPreview.appendChild(fileEntry);
// Display thumbnail for image files
if (file.type.startsWith('image/')) {
const reader = new FileReader();
reader.onload = (e) => {
const img = document.createElement('img');
img.src = e.target.result;
fileEntry.querySelector('.thumbnail-preview').appendChild(img);
};
reader.readAsDataURL(file);
}
});
} else {
startUploadBtn.disabled = true;
}
});
startUploadBtn.addEventListener('click', () => {
if (filesToUpload.length > 0) {
uploadFiles(filesToUpload);
}
});
async function uploadFiles(files) {
startUploadBtn.disabled = true; // Disable button during upload
uploadProgressContainer.innerHTML = ''; // Clear previous progress
const currentPath = new URLSearchParams(window.location.search).get('path') || '';
for (const file of files) {
const formData = new FormData();
formData.append('file', file);
const progressBarContainer = document.createElement('div');
progressBarContainer.className = 'progress-bar-container';
progressBarContainer.innerHTML = `
<div class="file-name">${file.name}</div>
<div class="progress-bar-wrapper">
<div class="progress-bar" id="progress-${file.name.replace(/\./g, '-')}" style="width: 0%;"></div>
</div>
<div class="progress-text" id="progress-text-${file.name.replace(/\./g, '-')}" >0%</div>
`;
uploadProgressContainer.appendChild(progressBarContainer);
const xhr = new XMLHttpRequest();
xhr.open('POST', `/files/upload?current_path=${encodeURIComponent(currentPath)}`, true);
xhr.upload.addEventListener('progress', (event) => {
if (event.lengthComputable) {
const percent = (event.loaded / event.total) * 100;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `${percent}%`;
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `${Math.round(percent)}%`;
}
});
xhr.addEventListener('load', () => {
if (xhr.status === 200) {
console.log(`File ${file.name} uploaded successfully.`);
// Update progress to 100% on completion
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `100%`;
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `100% (Done)`;
} else {
console.error(`Error uploading ${file.name}: ${xhr.statusText}`);
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Failed (${xhr.status})`;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
}
});
xhr.addEventListener('error', () => {
console.error(`Network error uploading ${file.name}.`);
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Network Error`;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
});
xhr.send(formData);
}
// After all files are sent, refresh the page to show new files
// A small delay to allow server to process and update file list
setTimeout(() => {
window.location.reload();
}, 1000);
}
});

View File

@ -1,5 +1,6 @@
import './components/slider.js';
import './components/navigation.js'; // Assuming navigation.js might be needed globally
import { showUploadModal } from './components/upload.js'; // Import showUploadModal
document.addEventListener('DOMContentLoaded', () => {
// Logic for custom-slider on order page
@ -82,4 +83,262 @@ document.addEventListener('DOMContentLoaded', () => {
// Initial display based on active button (default to monthly)
updatePricingDisplay('monthly');
}
// --- File Browser Specific Logic ---
// Helper functions for modals
function showNewFolderModal() {
document.getElementById('new-folder-modal').style.display = 'block';
}
function closeModal(modalId) {
document.getElementById(modalId).style.display = 'none';
}
window.closeModal = closeModal; // Make it globally accessible
window.onclick = function(event) {
if (event.target.classList.contains('modal')) {
event.target.style.display = 'none';
}
}
// File actions
function downloadFile(path) {
window.location.href = `/files/download/${path}`;
}
async function shareFile(paths, names) {
const modal = document.getElementById('share-modal');
const linkContainer = document.getElementById('share-link-container');
const loading = document.getElementById('share-loading');
const shareLinkInput = document.getElementById('share-link-input');
const shareFileName = document.getElementById('share-file-name');
const shareLinksList = document.getElementById('share-links-list'); // New element for multiple links
// Clear previous content
shareLinkInput.value = '';
if (shareLinksList) shareLinksList.innerHTML = '';
linkContainer.style.display = 'none';
loading.style.display = 'block';
modal.style.display = 'block';
if (paths.length === 1) {
shareFileName.textContent = `Sharing: ${names[0]}`;
} else {
shareFileName.textContent = `Sharing ${paths.length} items`;
}
try {
const response = await fetch(`/files/share_multiple`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ paths: paths })
});
const data = await response.json();
if (data.share_links && data.share_links.length > 0) {
if (data.share_links.length === 1) {
shareLinkInput.value = data.share_links[0];
linkContainer.style.display = 'block';
} else {
// Display multiple links
if (!shareLinksList) {
// Create the list if it doesn't exist
const ul = document.createElement('ul');
ul.id = 'share-links-list';
linkContainer.appendChild(ul);
shareLinksList = ul;
}
data.share_links.forEach(item => {
const li = document.createElement('li');
li.innerHTML = `<strong>${item.name}:</strong> <input type="text" value="${item.link}" readonly class="form-input share-link-item-input"> <button class="btn-primary copy-share-link-item-btn" data-link="${item.link}">Copy</button>`;
shareLinksList.appendChild(li);
});
linkContainer.style.display = 'block';
}
loading.style.display = 'none';
} else {
loading.textContent = 'Error generating share link(s)';
}
} catch (error) {
console.error('Error sharing files:', error);
loading.textContent = 'Error generating share link(s)';
}
}
function copyShareLink() {
const input = document.getElementById('share-link-input');
input.select();
document.execCommand('copy');
alert('Share link copied to clipboard');
}
function deleteFile(paths, names) {
const deleteForm = document.getElementById('delete-form');
const deleteMessage = document.getElementById('delete-message');
const deleteModal = document.getElementById('delete-modal');
// Clear previous hidden inputs
deleteForm.querySelectorAll('input[name="paths[]"]').forEach(input => input.remove());
if (Array.isArray(paths) && paths.length > 1) {
deleteMessage.textContent = `Are you sure you want to delete ${paths.length} items? This action cannot be undone.`;
deleteForm.action = `/files/delete_multiple`;
paths.forEach(path => {
const input = document.createElement('input');
input.type = 'hidden';
input.name = 'paths[]';
input.value = path;
deleteForm.appendChild(input);
});
} else {
const path = Array.isArray(paths) ? paths[0] : paths;
const name = Array.isArray(names) ? names[0] : names;
deleteMessage.textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
deleteForm.action = `/files/delete/${path}`;
}
deleteModal.style.display = 'block';
}
// Selection and action buttons
function updateActionButtons() {
const checked = document.querySelectorAll('.file-checkbox:checked');
const downloadBtn = document.getElementById('download-selected-btn');
const shareBtn = document.getElementById('share-selected-btn');
const deleteBtn = document.getElementById('delete-selected-btn');
const hasSelection = checked.length > 0;
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
if (downloadBtn) downloadBtn.disabled = !hasFiles;
if (shareBtn) shareBtn.disabled = !hasSelection;
if (deleteBtn) deleteBtn.disabled = !hasSelection;
}
function downloadSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
downloadFile(checked[0].dataset.path);
}
}
function shareSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
shareFile(paths, names);
}
}
function deleteSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
deleteFile(paths, names);
}
}
// Event Listeners for File Browser
const newFolderBtn = document.getElementById('new-folder-btn');
if (newFolderBtn) {
console.log('Attaching event listener to new-folder-btn');
newFolderBtn.addEventListener('click', showNewFolderModal);
}
const uploadBtn = document.getElementById('upload-btn');
if (uploadBtn) {
console.log('Attaching event listener to upload-btn');
uploadBtn.addEventListener('click', showUploadModal);
}
const createFirstFolderBtn = document.getElementById('create-first-folder-btn');
if (createFirstFolderBtn) {
console.log('Attaching event listener to create-first-folder-btn');
createFirstFolderBtn.addEventListener('click', showNewFolderModal);
}
const uploadFirstFileBtn = document.getElementById('upload-first-file-btn');
if (uploadFirstFileBtn) {
console.log('Attaching event listener to upload-first-file-btn');
uploadFirstFileBtn.addEventListener('click', showUploadModal);
}
const downloadSelectedBtn = document.getElementById('download-selected-btn');
if (downloadSelectedBtn) {
downloadSelectedBtn.addEventListener('click', downloadSelected);
}
const shareSelectedBtn = document.getElementById('share-selected-btn');
if (shareSelectedBtn) {
shareSelectedBtn.addEventListener('click', shareSelected);
}
const deleteSelectedBtn = document.getElementById('delete-selected-btn');
if (deleteSelectedBtn) {
deleteSelectedBtn.addEventListener('click', deleteSelected);
}
const copyShareLinkBtn = document.getElementById('copy-share-link-btn');
if (copyShareLinkBtn) {
copyShareLinkBtn.addEventListener('click', copyShareLink);
}
document.getElementById('select-all')?.addEventListener('change', function(e) {
const checkboxes = document.querySelectorAll('.file-checkbox');
checkboxes.forEach(cb => cb.checked = e.target.checked);
updateActionButtons();
});
document.querySelectorAll('.file-checkbox').forEach(cb => {
cb.addEventListener('change', updateActionButtons);
});
// Event listeners for dynamically created download/share/delete buttons
document.addEventListener('click', (event) => {
if (event.target.classList.contains('download-file-btn')) {
downloadFile(event.target.dataset.path);
} else if (event.target.classList.contains('share-file-btn')) {
const path = event.target.dataset.path;
const name = event.target.dataset.name;
shareFile([path], [name]);
} else if (event.target.classList.contains('delete-file-btn')) {
const path = event.target.dataset.path;
const name = event.target.dataset.name;
deleteFile([path], [name]);
} else if (event.target.classList.contains('copy-share-link-item-btn')) {
const linkToCopy = event.target.dataset.link;
navigator.clipboard.writeText(linkToCopy).then(() => {
alert('Share link copied to clipboard');
}).catch(err => {
console.error('Failed to copy link: ', err);
alert('Failed to copy link');
});
}
});
document.getElementById('search-bar')?.addEventListener('input', function(e) {
const searchTerm = e.target.value.toLowerCase();
const rows = document.querySelectorAll('#file-list-body tr');
rows.forEach(row => {
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
if (name && name.includes(searchTerm)) {
row.style.display = '';
} else {
row.style.display = 'none';
}
});
});
// Initial update of action buttons
updateActionButtons();
});

View File

@ -9,11 +9,11 @@
{% block page_title %}My Files{% endblock %}
{% block dashboard_actions %}
<button class="btn-primary" onclick="showNewFolderModal()">+ New</button>
<button class="btn-outline" onclick="showUploadModal()">Upload</button>
<button class="btn-outline" onclick="downloadSelected()" id="download-btn" disabled>Download</button>
<button class="btn-outline" onclick="shareSelected()" id="share-btn" disabled>Share</button>
<button class="btn-outline" onclick="deleteSelected()" id="delete-btn" disabled>Delete</button>
<button class="btn-primary" id="new-folder-btn">+ New</button>
<button class="btn-outline" id="upload-btn">Upload</button>
<button class="btn-outline" id="download-selected-btn" disabled>⬇️</button>
<button class="btn-outline" id="share-selected-btn" disabled>đź”—</button>
<button class="btn-outline" id="delete-selected-btn" disabled>Delete</button>
{% endblock %}
{% block dashboard_content %}
@ -70,10 +70,10 @@
<td>
<div class="action-buttons">
{% if not item.is_dir %}
<button class="btn-small" onclick="downloadFile('{{ item.path }}')">Download</button>
<button class="btn-small download-file-btn" data-path="{{ item.path }}">⬇️</button>
{% endif %}
<button class="btn-small" onclick="shareFile('{{ item.path }}', '{{ item.name }}')">Share</button>
<button class="btn-small btn-danger" onclick="deleteFile('{{ item.path }}', '{{ item.name }}')">Delete</button>
<button class="btn-small share-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">đź”—</button>
<button class="btn-small btn-danger delete-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">Delete</button>
</div>
</td>
</tr>
@ -82,8 +82,8 @@
<tr>
<td colspan="6" style="text-align: center; padding: 40px;">
<p>No files found in this directory.</p>
<button class="btn-primary" onclick="showNewFolderModal()" style="margin-top: 10px;">Create your first folder</button>
<button class="btn-outline" onclick="showUploadModal()" style="margin-top: 10px;">Upload a file</button>
<button class="btn-primary" id="create-first-folder-btn" style="margin-top: 10px;">Create your first folder</button>
<button class="btn-outline" id="upload-first-file-btn" style="margin-top: 10px;">Upload a file</button>
</td>
</tr>
{% endif %}
@ -108,15 +108,17 @@
<div id="upload-modal" class="modal">
<div class="modal-content">
<span class="close" onclick="closeModal('upload-modal')">&times;</span>
<h3>Upload File</h3>
<form action="/files/upload" method="post" enctype="multipart/form-data">
<input type="file" name="file" required class="form-input" id="file-input">
<div class="file-info" id="file-info"></div>
<h3>Upload Files</h3>
<div class="upload-area">
<input type="file" name="file" multiple class="form-input" id="file-input-multiple" style="display: none;">
<label for="file-input-multiple" class="btn-outline upload-button">Select Files</label>
<div id="selected-files-preview" class="selected-files-preview"></div>
<div id="upload-progress-container" class="upload-progress-container"></div>
</div>
<div class="modal-actions">
<button type="submit" class="btn-primary">Upload</button>
<button type="button" class="btn-primary" id="start-upload-btn" disabled>Upload</button>
<button type="button" class="btn-outline" onclick="closeModal('upload-modal')">Cancel</button>
</div>
</form>
</div>
</div>
@ -127,7 +129,8 @@
<p id="share-file-name"></p>
<div id="share-link-container" style="display: none;">
<input type="text" id="share-link-input" readonly class="form-input">
<button class="btn-primary" onclick="copyShareLink()">Copy Link</button>
<button class="btn-primary" id="copy-share-link-btn">Copy Link</button>
<div id="share-links-list" class="share-links-list"></div>
</div>
<div id="share-loading">Generating share link...</div>
<div class="modal-actions">
@ -150,146 +153,6 @@
</div>
</div>
<script>
function showNewFolderModal() {
document.getElementById('new-folder-modal').style.display = 'block';
}
function showUploadModal() {
document.getElementById('upload-modal').style.display = 'block';
}
function closeModal(modalId) {
document.getElementById(modalId).style.display = 'none';
}
window.onclick = function(event) {
if (event.target.classList.contains('modal')) {
event.target.style.display = 'none';
}
}
document.getElementById('file-input').addEventListener('change', function(e) {
const file = e.target.files[0];
if (file) {
const size = (file.size / 1024 / 1024).toFixed(2);
document.getElementById('file-info').innerHTML = `Selected: ${file.name} (${size} MB)`;
}
});
function downloadFile(path) {
window.location.href = `/files/download/${path}`;
}
async function shareFile(path, name) {
const modal = document.getElementById('share-modal');
const linkContainer = document.getElementById('share-link-container');
const loading = document.getElementById('share-loading');
document.getElementById('share-file-name').textContent = `Sharing: ${name}`;
linkContainer.style.display = 'none';
loading.style.display = 'block';
modal.style.display = 'block';
try {
const response = await fetch(`/files/share/${path}`, {
method: 'POST'
});
const data = await response.json();
if (data.share_link) {
document.getElementById('share-link-input').value = data.share_link;
linkContainer.style.display = 'block';
loading.style.display = 'none';
}
} catch (error) {
loading.textContent = 'Error generating share link';
}
}
function copyShareLink() {
const input = document.getElementById('share-link-input');
input.select();
document.execCommand('copy');
alert('Share link copied to clipboard');
}
function deleteFile(path, name) {
document.getElementById('delete-message').textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
document.getElementById('delete-form').action = `/files/delete/${path}`;
document.getElementById('delete-modal').style.display = 'block';
}
document.getElementById('select-all').addEventListener('change', function(e) {
const checkboxes = document.querySelectorAll('.file-checkbox');
checkboxes.forEach(cb => cb.checked = e.target.checked);
updateActionButtons();
});
document.querySelectorAll('.file-checkbox').forEach(cb => {
cb.addEventListener('change', updateActionButtons);
});
function updateActionButtons() {
const checked = document.querySelectorAll('.file-checkbox:checked');
const downloadBtn = document.getElementById('download-btn');
const shareBtn = document.getElementById('share-btn');
const deleteBtn = document.getElementById('delete-btn');
const hasSelection = checked.length > 0;
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
downloadBtn.disabled = !hasFiles;
shareBtn.disabled = !hasSelection;
deleteBtn.disabled = !hasSelection;
}
function downloadSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
downloadFile(checked[0].dataset.path);
}
}
function shareSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1) {
const path = checked[0].dataset.path;
const name = checked[0].closest('tr').querySelector('td:nth-child(2)').textContent.trim();
shareFile(path, name);
}
}
function deleteSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
if (checked.length === 1) {
deleteFile(paths[0], names[0]);
} else {
document.getElementById('delete-message').textContent =
`Are you sure you want to delete ${checked.length} items? This action cannot be undone.`;
document.getElementById('delete-modal').style.display = 'block';
}
}
}
document.getElementById('search-bar').addEventListener('input', function(e) {
const searchTerm = e.target.value.toLowerCase();
const rows = document.querySelectorAll('#file-list-body tr');
rows.forEach(row => {
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
if (name && name.includes(searchTerm)) {
row.style.display = '';
} else {
row.style.display = 'none';
}
});
});
</script>
<script type="module" src="/static/js/components/upload.js"></script>
<script type="module" src="/static/js/main.js"></script>
{% endblock %}

View File

@ -0,0 +1,64 @@
{% extends "layouts/base.html" %}
{% block title %}Shared Folder - Retoor's Cloud Solutions{% endblock %}
{% block head %}
<link rel="stylesheet" href="/static/css/components/file_browser.css">
{% endblock %}
{% block content %}
<div class="container dashboard-container">
<h1 class="page-title">Shared Folder</h1>
<div class="file-list-table">
<table>
<thead>
<tr>
<th>Name</th>
<th>Last Modified</th>
<th>Size</th>
<th>Actions</th>
</tr>
</thead>
<tbody id="file-list-body">
{% if files %}
{% for item in files %}
<tr data-path="{{ item.path }}" data-is-dir="{{ item.is_dir }}">
<td>
{% if item.is_dir %}
<img src="/static/images/icon-families.svg" alt="Folder Icon" class="file-icon">
<a href="/shared_file/{{ share_id }}?path={{ item.path }}">{{ item.name }}</a>
{% else %}
<img src="/static/images/icon-professionals.svg" alt="File Icon" class="file-icon">
{{ item.name }}
{% endif %}
</td>
<td>{{ item.last_modified[:10] }}</td>
<td>
{% if item.is_dir %}
--
{% else %}
{{ (item.size / 1024 / 1024)|round(2) }} MB
{% endif %}
</td>
<td>
<div class="action-buttons">
{% if not item.is_dir %}
<a href="/shared_file/{{ share_id }}/download?file_path={{ item.path }}" class="btn-small">⬇️</a>
{% endif %}
</div>
</td>
</tr>
{% endfor %}
{% else %}
<tr>
<td colspan="4" style="text-align: center; padding: 40px;">
<p>No files found in this shared directory.</p>
</td>
</tr>
{% endif %}
</tbody>
</table>
</div>
</div>
{% endblock %}

View File

@ -1,3 +1,4 @@
import logging
from aiohttp import web
import aiohttp_jinja2
import os
@ -7,6 +8,14 @@ from aiohttp.web_response import json_response
from ..helpers.auth import login_required
from .auth import CustomPydanticView
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# PROJECT_DIR is no longer directly used for user files, as FileService manages them
# PROJECT_DIR = Path(__file__).parent.parent.parent / "project"
@ -45,7 +54,7 @@ class SiteView(web.View):
@login_required
async def dashboard(self):
return web.HTTPFound(self.request.app.router["file_browser"].url_for())
raise web.HTTPFound(self.request.app.router["file_browser"].url_for())
async def solutions(self):
return aiohttp_jinja2.render_template(
@ -126,9 +135,6 @@ class SiteView(web.View):
class FileBrowserView(web.View):
@login_required
async def get(self):
if self.request.match_info.get("file_path") is not None:
return await self.get_download_file()
user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"]
@ -157,12 +163,14 @@ class FileBrowserView(web.View):
user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"]
route_name = self.request.match_info.route.name
logger.debug(f"FileBrowserView: POST request for route: {route_name}")
if route_name == "new_folder":
data = await self.request.post()
folder_name = data.get("folder_name")
if not folder_name:
return web.HTTPFound(
logger.warning("FileBrowserView: New folder request missing folder_name")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Folder name is required"
)
@ -170,74 +178,42 @@ class FileBrowserView(web.View):
success = await file_service.create_folder(user_email, folder_name)
if success:
return web.HTTPFound(
logger.info(f"FileBrowserView: Folder '{folder_name}' created successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"Folder '{folder_name}' created successfully"
)
)
else:
return web.HTTPFound(
logger.error(f"FileBrowserView: Failed to create folder '{folder_name}' for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error=f"Folder '{folder_name}' already exists or could not be created"
)
)
elif route_name == "upload_file":
try:
reader = await self.request.multipart()
field = await reader.next()
if not field or field.name != "file":
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="No file selected for upload"
)
)
filename = field.filename
if not filename:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Filename is required"
)
)
content = await field.read()
success = await file_service.upload_file(user_email, filename, content)
if success:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"File '{filename}' uploaded successfully"
)
)
else:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error=f"Failed to upload file '{filename}'"
)
)
except Exception as e:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error=f"Upload error: {str(e)}"
)
)
elif route_name == "share_file":
file_path = self.request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Share file request for path: {file_path} by user {user_email}")
if not file_path:
logger.warning("FileBrowserView: Share file request missing file_path")
return json_response({"error": "File path is required for sharing"}, status=400)
share_id = await file_service.generate_share_link(user_email, file_path)
if share_id:
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
logger.info(f"FileBrowserView: Share link generated: {share_link} for file: {file_path}")
return json_response({"share_link": share_link})
else:
logger.error(f"FileBrowserView: Failed to generate share link for file: {file_path} by user {user_email}")
return json_response({"error": "Failed to generate share link"}, status=500)
elif route_name == "delete_item":
item_path = self.request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Delete item request for path: {item_path} by user {user_email}")
if not item_path:
return web.HTTPFound(
logger.warning("FileBrowserView: Delete item request missing item_path")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Item path is required for deletion"
)
@ -245,38 +221,199 @@ class FileBrowserView(web.View):
success = await file_service.delete_item(user_email, item_path)
if success:
return web.HTTPFound(
logger.info(f"FileBrowserView: Item '{item_path}' deleted successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"Item deleted successfully"
)
)
else:
return web.HTTPFound(
logger.error(f"FileBrowserView: Failed to delete item '{item_path}' for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Failed to delete item - it may not exist"
)
)
return web.HTTPBadRequest(text="Unknown file action")
elif route_name == "delete_multiple_items":
data = await self.request.post()
paths = data.getall("paths[]", [])
logger.debug(f"FileBrowserView: Delete multiple items request for paths: {paths} by user {user_email}")
if not paths:
logger.warning("FileBrowserView: Delete multiple items request missing paths")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="No items selected for deletion"
)
)
all_successful = True
for path in paths:
success = await file_service.delete_item(user_email, path)
if not success:
all_successful = False
logger.error(f"FileBrowserView: Failed to delete item '{path}' for user {user_email} during bulk delete")
if all_successful:
logger.info(f"FileBrowserView: All {len(paths)} items deleted successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"Successfully deleted {len(paths)} items"
)
)
else:
logger.error(f"FileBrowserView: Some items failed to delete for user {user_email} during bulk delete")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Some items failed to delete"
)
)
elif route_name == "share_multiple_items":
data = await self.request.json()
paths = data.get("paths", [])
logger.debug(f"FileBrowserView: Share multiple items request for paths: {paths} by user {user_email}")
if not paths:
logger.warning("FileBrowserView: Share multiple items request missing paths")
return json_response({"error": "No items selected for sharing"}, status=400)
share_links = []
for path in paths:
share_id = await file_service.generate_share_link(user_email, path)
if share_id:
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
# Extract file name from path
file_name = os.path.basename(path)
share_links.append({"name": file_name, "link": share_link})
else:
logger.error(f"FileBrowserView: Failed to generate share link for file: {path} by user {user_email}")
if share_links:
logger.info(f"FileBrowserView: Generated {len(share_links)} share links for user {user_email}")
return json_response({"share_links": share_links})
else:
logger.error(f"FileBrowserView: Failed to generate any share links for user {user_email}")
return json_response({"error": "Failed to generate share links for any selected items"}, status=500)
logger.warning(f"FileBrowserView: Unknown file action for POST request: {route_name}")
return web.Response(status=400, text="Unknown file action")
@login_required
async def get_download_file(self):
user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"]
file_path = self.request.match_info.get("file_path")
request = self # self is the request object here
user_email = request["user"]["email"]
file_service = request.app["file_service"]
file_path = request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Download file request for path: {file_path} by user {user_email}")
if not file_path:
return web.HTTPBadRequest(text="File path is required for download")
logger.warning("FileBrowserView: Download file request missing file_path")
raise web.HTTPBadRequest(text="File path is required for download")
result = await file_service.download_file(user_email, file_path)
if result:
content, filename = result
response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: File '{filename}' downloaded successfully by user {user_email}")
return response
else:
return web.HTTPNotFound(text="File not found")
logger.error(f"FileBrowserView: Failed to download file: {file_path} for user {user_email} - file not found or access denied")
raise web.HTTPNotFound(text="File not found")
async def shared_file_handler(self):
share_id = self.request.match_info.get("share_id")
file_service = self.request.app["file_service"]
logger.debug(f"FileBrowserView: Handling shared file request for share_id: {share_id}")
shared_item = await file_service.get_shared_item(share_id)
if not shared_item:
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id}")
return aiohttp_jinja2.render_template(
"pages/errors/404.html",
self.request,
{"request": self.request, "message": "Shared link is invalid or has expired."},
status=404
)
user_email = shared_item["user_email"]
item_path = shared_item["item_path"]
full_path = file_service._get_user_file_path(user_email, item_path)
if full_path.is_file():
result = await file_service.get_shared_file_content(share_id)
if result:
content, filename = result
response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: Serving shared file '{filename}' for share_id: {share_id}")
return response
else:
logger.error(f"FileBrowserView: Failed to get content for shared file: {item_path} (share_id: {share_id})")
raise web.HTTPNotFound(text="Shared file not found or inaccessible")
elif full_path.is_dir():
files = await file_service.get_shared_folder_content(share_id)
logger.info(f"FileBrowserView: Serving shared folder '{item_path}' for share_id: {share_id}")
return aiohttp_jinja2.render_template(
"pages/shared_folder.html",
self.request,
{
"request": self.request,
"files": files,
"current_path": item_path,
"share_id": share_id,
"user_email": user_email,
"active_page": "shared"
}
)
else:
logger.error(f"FileBrowserView: Shared item is neither file nor directory: {item_path} (share_id: {share_id})")
raise web.HTTPNotFound(text="Shared item not found")
async def download_shared_file_handler(self):
share_id = self.request.match_info.get("share_id")
file_path = self.request.query.get("file_path") # This is the path of the file *within* the shared item
file_service = self.request.app["file_service"]
logger.debug(f"FileBrowserView: Handling download shared file request for share_id: {share_id}, file_path: {file_path}")
if not file_path:
logger.warning("FileBrowserView: Download shared file request missing file_path query parameter.")
raise web.HTTPBadRequest(text="File path is required for download from shared folder.")
shared_item = await file_service.get_shared_item(share_id)
if not shared_item:
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id} during download.")
return aiohttp_jinja2.render_template(
"pages/errors/404.html",
self.request,
{"request": self.request, "message": "Shared link is invalid or has expired."},
status=404
)
# Ensure the shared item is a directory if a file_path is provided
user_email = shared_item["user_email"]
original_shared_item_path = file_service._get_user_file_path(user_email, shared_item["item_path"])
if not original_shared_item_path.is_dir():
logger.warning(f"FileBrowserView: Attempt to download a specific file from a shared item that is not a directory. Share_id: {share_id}")
raise web.HTTPBadRequest(text="Cannot download specific files from a shared item that is not a folder.")
result = await file_service.get_shared_file_content(share_id, file_path)
if result:
content, filename = result
response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: Serving shared file '{filename}' from shared folder for share_id: {share_id}")
return response
else:
logger.error(f"FileBrowserView: Failed to get content for shared file: {file_path} (share_id: {share_id}) from shared folder.")
raise web.HTTPNotFound(text="Shared file not found or inaccessible within the shared folder.")
class OrderView(CustomPydanticView):
@ -323,7 +460,7 @@ class UserManagementView(web.View):
elif route_name == "user_details":
return await self.user_details_page()
return web.HTTPNotFound()
raise web.HTTPNotFound()
@login_required
async def post(self):
@ -336,7 +473,7 @@ class UserManagementView(web.View):
elif route_name == "delete_user_page":
return await self.delete_user_submit()
return web.HTTPNotFound()
raise web.HTTPNotFound()
async def add_user_page(self):
return aiohttp_jinja2.render_template(
@ -403,7 +540,7 @@ class UserManagementView(web.View):
user_service.update_user_quota(email, float(storage_quota_gb))
return web.HTTPFound(
raise web.HTTPFound(
self.request.app.router["users"].url_for().with_query(
success=f"User {email} added successfully"
)
@ -429,7 +566,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email)
if not user_data:
return web.HTTPNotFound(text="User not found")
raise web.HTTPNotFound(text="User not found")
success_message = self.request.query.get("success")
@ -464,7 +601,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email)
if not user_data:
return web.HTTPNotFound(text="User not found")
raise web.HTTPNotFound(text="User not found")
if errors:
return aiohttp_jinja2.render_template(
@ -481,7 +618,7 @@ class UserManagementView(web.View):
user_service.update_user_quota(email, storage_quota_gb)
return web.HTTPFound(
raise web.HTTPFound(
self.request.app.router["edit_user"].url_for(email=email).with_query(
success="User quota updated successfully"
)
@ -494,7 +631,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email)
if not user_data:
return web.HTTPNotFound(text="User not found")
raise web.HTTPNotFound(text="User not found")
return aiohttp_jinja2.render_template(
"pages/user_details.html",
@ -514,11 +651,11 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email)
if not user_data:
return web.HTTPNotFound(text="User not found")
raise web.HTTPNotFound(text="User not found")
user_service.delete_user(email)
return web.HTTPFound(
raise web.HTTPFound(
self.request.app.router["users"].url_for().with_query(
success=f"User {email} deleted successfully"
)

50
retoors/views/upload.py Normal file
View File

@ -0,0 +1,50 @@
from aiohttp import web
import aiohttp_jinja2
from aiohttp.web_response import json_response
from ..helpers.auth import login_required
class UploadView(web.View):
@login_required
async def post(self):
user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"]
# Get current path from query parameter or form data
current_path = self.request.query.get("current_path", "")
try:
reader = await self.request.multipart()
files_uploaded = []
errors = []
while True:
field = await reader.next()
if field is None:
break
# Check if the field is a file input
if field.name == "file": # Assuming the input field name is 'file'
filename = field.filename
if not filename:
errors.append("Filename is required for one of the files.")
continue
content = await field.read()
# Construct the full file path relative to the user's base directory
full_file_path_for_service = f"{current_path}/{filename}" if current_path else filename
success = await file_service.upload_file(user_email, full_file_path_for_service, content)
if success:
files_uploaded.append(filename)
else:
errors.append(f"Failed to upload file '{filename}'")
if errors:
return json_response({"status": "error", "message": "Some files failed to upload", "details": errors}, status=500)
elif files_uploaded:
return json_response({"status": "success", "message": f"Successfully uploaded {len(files_uploaded)} files", "files": files_uploaded})
else:
return json_response({"status": "error", "message": "No files were uploaded"}, status=400)
except Exception as e:
return json_response({"status": "error", "message": f"Upload error: {str(e)}"}, status=500)

View File

@ -5,6 +5,59 @@ from retoors.main import create_app
from retoors.services.config_service import ConfigService
from pytest_mock import MockerFixture # Import MockerFixture
import datetime # Import datetime
from aiohttp.test_utils import TestClient # Import TestClient
from aiohttp_session import setup as setup_session # Import setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage # Import EncryptedCookieStorage
from retoors.helpers.env_manager import get_or_create_session_secret_key # Import get_or_create_session_secret_key
from retoors.middlewares import user_middleware, error_middleware # Import middlewares
from retoors.services.user_service import UserService # Import UserService
from retoors.routes import setup_routes # Import setup_routes
import aiohttp_jinja2 # Import aiohttp_jinja2
import jinja2 # Import jinja2
import aiohttp # Import aiohttp
from retoors.services.file_service import FileService # Import FileService
@pytest.fixture
def temp_user_files_dir(tmp_path):
"""Fixture to create a temporary directory for user files."""
user_files_dir = tmp_path / "user_files"
user_files_dir.mkdir()
return user_files_dir
@pytest.fixture
def temp_users_json(tmp_path):
"""Fixture to create a temporary users.json file."""
users_json_path = tmp_path / "users.json"
initial_users_data = [
{
"email": "test@example.com",
"full_name": "Test User",
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": None,
"shared_items": {}
},
{
"email": "child@example.com",
"full_name": "Child User",
"email": "child@example.com",
"password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": "test@example.com",
"shared_items": {}
}
]
with open(users_json_path, "w") as f:
json.dump(initial_users_data, f)
return users_json_path
@pytest.fixture
def file_service_instance(temp_user_files_dir, temp_users_json):
"""Fixture to provide a FileService instance with temporary directories."""
return FileService(temp_user_files_dir, temp_users_json)
@pytest.fixture
@ -12,6 +65,42 @@ def create_app_instance():
"""Fixture to create a new aiohttp application instance."""
return create_app()
@pytest.fixture
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
"""Fixture to create a test aiohttp application with mocked services."""
from aiohttp import web
app = web.Application()
# Setup session for the test app
project_root = Path(__file__).parent.parent
env_file_path = project_root / ".env"
secret_key = get_or_create_session_secret_key(env_file_path)
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
app.middlewares.append(error_middleware)
app.middlewares.append(user_middleware)
# Mock UserService
mock_user_service = mocker.MagicMock(spec=UserService)
# Mock scheduler
mock_scheduler = mocker.MagicMock()
mock_scheduler.spawn = mocker.AsyncMock()
mock_scheduler.close = mocker.AsyncMock()
app["user_service"] = mock_user_service
app["file_service"] = file_service_instance
app["scheduler"] = mock_scheduler
# Setup Jinja2 for templates
base_path = Path(__file__).parent.parent / "retoors"
templates_path = base_path / "templates"
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
setup_routes(app)
return app
@pytest.fixture(scope="function")
def mock_users_db_fixture():
@ -39,9 +128,8 @@ def mock_users_db_fixture():
"storage_quota_gb": 50,
"storage_used_gb": 5,
"parent_email": "admin@example.com",
"reset_token": None,
"reset_token_expiry": None,
},
"shared_items": {}
}
}
@ -242,6 +330,118 @@ async def client(
config_file.unlink(missing_ok=True) # Use missing_ok for robustness
@pytest.fixture
async def logged_in_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
return client
@pytest.fixture
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in admin user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "admin@example.com", "password": "password"}
)
return client
@pytest.fixture
def mock_send_email(mocker: MockerFixture):
"""

View File

@ -4,6 +4,7 @@ from pathlib import Path
import json
import datetime
import aiohttp
from aiohttp import web
from aiohttp.test_utils import TestClient
from aiohttp_session import setup as setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
@ -12,157 +13,9 @@ from retoors.helpers.env_manager import get_or_create_session_secret_key
# Assuming the FileService is in retoors/services/file_service.py
# and the FileBrowserView is in retoors/views/site.py
@pytest.fixture
def temp_user_files_dir(tmp_path):
"""Fixture to create a temporary directory for user files."""
user_files_dir = tmp_path / "user_files"
user_files_dir.mkdir()
return user_files_dir
@pytest.fixture
def temp_users_json(tmp_path):
"""Fixture to create a temporary users.json file."""
users_json_path = tmp_path / "users.json"
initial_users_data = {
"test@example.com": {
"full_name": "Test User",
"email": "test@example.com",
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": None,
"shared_items": {}
},
"child@example.com": {
"full_name": "Child User",
"email": "child@example.com",
"password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": "test@example.com",
"shared_items": {}
}
}
with open(users_json_path, "w") as f:
json.dump(initial_users_data, f)
return users_json_path
@pytest.fixture
def file_service_instance(temp_user_files_dir, temp_users_json):
"""Fixture to provide a FileService instance with temporary directories."""
from retoors.services.file_service import FileService
return FileService(temp_user_files_dir, temp_users_json)
@pytest.fixture
async def logged_in_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
return client
@pytest.fixture
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in admin user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "admin@example.com", "password": "password"}
)
return client
# --- FileService Tests ---
@ -310,10 +163,10 @@ async def test_file_service_shared_link_expiry(file_service_instance, mocker):
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
share_id = await file_service_instance.generate_share_link(user_email, file_path)
# Mock datetime to simulate an expired link
# Mock datetime to simulate an expired link (after generating the link)
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime))
datetime.datetime.now.return_value = future_time
mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
shared_item = await file_service_instance.get_shared_item(share_id)
assert shared_item is None
@ -355,6 +208,25 @@ async def test_file_browser_new_folder(logged_in_client: TestClient, file_servic
expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
assert expected_path.is_dir()
@pytest.mark.asyncio
async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False)
assert resp.status == 302
assert "error=Folder+name+is+required" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "existing_folder_web"
await file_service_instance.create_folder(user_email, folder_name) # Create it first
# Mock create_folder to return False, simulating it already exists or failed
mocker.patch.object(file_service_instance, "create_folder", return_value=False)
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False)
assert resp.status == 302
assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
@ -368,8 +240,7 @@ async def test_file_browser_upload_file(logged_in_client: TestClient, file_servi
content_type='text/plain')
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
assert resp.status == 200
expected_path = temp_user_files_dir / user_email / "uploaded.txt"
assert expected_path.is_file()
@ -389,8 +260,9 @@ async def test_file_browser_download_file(logged_in_client: TestClient, file_ser
@pytest.mark.asyncio
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
assert resp.status == 404
response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
assert response.status == 404
assert "File not found" in await response.text()
@pytest.mark.asyncio
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
@ -422,61 +294,272 @@ async def test_file_browser_delete_folder(logged_in_client: TestClient, file_ser
assert not expected_path.is_dir()
@pytest.mark.asyncio
async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance):
async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
file_name = "web_share.txt"
await file_service_instance.upload_file(user_email, file_name, b"shareable content")
file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
paths_to_delete = [f"{name}" for name in file_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
for name in file_names:
expected_path = temp_user_files_dir / user_email / name
assert not expected_path.is_file()
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"]
for name in folder_names:
await file_service_instance.create_folder(user_email, name)
await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content")
paths_to_delete = [f"{name}" for name in folder_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
for name in folder_names:
expected_path = temp_user_files_dir / user_email / name
assert not expected_path.is_dir()
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False)
assert resp.status == 302
assert "error=No+items+selected+for+deletion" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, temp_user_files_dir, mocker):
user_email = "test@example.com"
file_names = ["fail_delete_1.txt", "fail_delete_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
paths_to_delete = [f"{name}" for name in file_names]
# Mock delete_item to fail for the first item
original_delete_item = file_service_instance.delete_item
async def mock_delete_item(email, path):
if path == file_names[0]:
return False # Simulate failure for the first item
return await original_delete_item(email, path)
mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item)
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert "error=Some+items+failed+to+delete" in resp.headers["Location"]
# Check if the first file still exists (failed to delete)
assert (temp_user_files_dir / user_email / file_names[0]).is_file()
# Check if the second file is deleted (succeeded)
assert not (temp_user_files_dir / user_email / file_names[1]).is_file()
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share_multiple", json={"paths": []})
assert resp.status == 400
data = await resp.json()
assert data["error"] == "No items selected for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_missing_path(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL
assert resp.status == 400
data = await resp.json()
assert data["error"] == "File path is required for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "fail_share_link.txt"
await file_service_instance.upload_file(user_email, file_name, b"content")
mocker.patch.object(file_service_instance, "generate_share_link", return_value=None)
resp = await logged_in_client.post(f"/files/share/{file_name}")
assert resp.status == 200
assert resp.status == 500
data = await resp.json()
assert "share_link" in data
assert "http" in data["share_link"] # Check if it's a URL
assert data["error"] == "Failed to generate share link"
# Verify the shared item can be retrieved
# The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID
share_id = data["share_link"].split("/")[-1]
shared_item = await file_service_instance.get_shared_item(share_id)
assert shared_item is not None
assert shared_item["item_path"] == file_name
@pytest.mark.asyncio
async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL
assert resp.status == 302
assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"]
@pytest.fixture
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
"""Fixture to create a test aiohttp application with mocked services."""
from aiohttp import web
from retoors.middlewares import user_middleware, error_middleware
from retoors.services.user_service import UserService
from retoors.routes import setup_routes
import aiohttp_jinja2
import jinja2
@pytest.mark.asyncio
async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "fail_delete.txt"
await file_service_instance.upload_file(user_email, file_name, b"content")
app = web.Application()
mocker.patch.object(file_service_instance, "delete_item", return_value=False)
# Setup session for the test app
project_root = Path(__file__).parent.parent
env_file_path = project_root / ".env"
secret_key = get_or_create_session_secret_key(env_file_path)
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
assert resp.status == 302
assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"]
app.middlewares.append(error_middleware)
app.middlewares.append(user_middleware)
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "shared_folder"
file_name = "nested.txt"
share_id = "test_share_id"
# Mock UserService
mock_user_service = mocker.MagicMock(spec=UserService)
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": folder_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: True))
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
# Mock scheduler
mock_scheduler = mocker.MagicMock()
mock_scheduler.spawn = mocker.AsyncMock()
mock_scheduler.close = mocker.AsyncMock()
resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible within the shared folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
app["user_service"] = mock_user_service
app["file_service"] = file_service_instance
app["scheduler"] = mock_scheduler
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: False))
resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt")
assert resp.status == 400
text = await resp.text()
assert "Cannot download specific files from a shared item that is not a folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt")
assert resp.status == 404
text = await resp.text()
assert "Shared link is invalid or has expired." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient):
resp = await client.get("/shared_file/some_share_id/download")
assert resp.status == 400
assert "File path is required for download from shared folder." in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
item_path = "mystery_item"
share_id = "test_share_id"
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": item_path,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
# Mock Path.is_file and Path.is_dir to return False
mocker.patch("pathlib.Path.is_file", return_value=False)
mocker.patch("pathlib.Path.is_dir", return_value=False)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared item not found" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id")
assert resp.status == 404
text = await resp.text()
assert "Shared link is invalid or has expired." in text
@pytest.mark.asyncio
async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker):
# Mock the route name to simulate an unknown action
mock_route = mocker.MagicMock(name="unknown_action")
mock_route.current_app = logged_in_client.app # Provide a mock current_app
mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route})
resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False)
assert resp.status == 400
assert "Unknown file action" in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_names = ["fail_share_1.txt", "fail_share_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content to share")
paths_to_share = [f"{name}" for name in file_names]
# Mock generate_share_link to fail for the first item
original_generate_share_link = file_service_instance.generate_share_link
async def mock_generate_share_link(email, path):
if path == file_names[0]:
return None # Simulate failure for the first item
return await original_generate_share_link(email, path)
mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link)
resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share})
assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds
data = await resp.json()
assert "share_links" in data
assert len(data["share_links"]) == 1 # Only one link should be generated
assert data["share_links"][0]["name"] == file_names[1] # The successful one
# Setup Jinja2 for templates
base_path = Path(__file__).parent.parent / "retoors"
templates_path = base_path / "templates"
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
setup_routes(app)
return app

View File

@ -133,34 +133,44 @@ async def test_privacy_get(client):
assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text
async def test_shared_get_unauthorized(client):
resp = await client.get("/shared", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_shared_get_authorized(logged_in_client):
resp = await logged_in_client.get("/shared")
assert resp.status == 200
text = await resp.text()
assert "Shared with me" in text
assert "Files and folders that have been shared with you will appear here." in text
async def test_recent_get_unauthorized(client):
resp = await client.get("/recent", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_recent_get_authorized(logged_in_client):
resp = await logged_in_client.get("/recent")
assert resp.status == 200
text = await resp.text()
assert "Recent Files" in text
assert "Your recently accessed files will appear here." in text
async def test_favorites_get_unauthorized(client):
resp = await client.get("/favorites", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_favorites_get_authorized(logged_in_client):
resp = await logged_in_client.get("/favorites")
assert resp.status == 200
text = await resp.text()
assert "Favorites" in text
assert "Your favorite files and folders will appear here." in text
async def test_trash_get_unauthorized(client):
resp = await client.get("/trash", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_trash_get_authorized(logged_in_client):
resp = await logged_in_client.get("/trash")
assert resp.status == 200
text = await resp.text()
assert "Trash" in text
assert "Files and folders you have deleted will appear here." in text
async def test_file_browser_get_unauthorized(client):
resp = await client.get("/files", allow_redirects=False)
assert resp.status == 302
assert resp.headers["Location"] == "/login"
async def test_users_get_authorized(logged_in_client):
resp = await logged_in_client.get("/users")
assert resp.status == 200
text = await resp.text()
assert "User Management" in text
assert "+ Add New User" in text
async def test_file_browser_get_authorized(client):