Compare commits
No commits in common. "f54941dd80bb1e415159758dc44c93df904fb74c" and "8d740be5fb2429340f720325db2b2e75b1c74867" have entirely different histories.
f54941dd80
...
8d740be5fb
@ -9,4 +9,3 @@ pytest
|
||||
pytest-aiohttp
|
||||
aiohttp-test-utils
|
||||
pytest-mock
|
||||
pillow
|
||||
|
||||
@ -6,17 +6,13 @@ from ..services.user_service import UserService # Import UserService
|
||||
def login_required(func):
|
||||
@wraps(func)
|
||||
async def wrapper(self, *args, **kwargs):
|
||||
if not getattr(self, 'request', None):
|
||||
request = self
|
||||
else:
|
||||
request = self.request
|
||||
session = await get_session(request)
|
||||
session = await get_session(self.request)
|
||||
user_email = session.get('user_email')
|
||||
|
||||
if not user_email:
|
||||
raise web.HTTPFound('/login')
|
||||
|
||||
user_service: UserService = request.app["user_service"]
|
||||
user_service: UserService = self.request.app["user_service"]
|
||||
user = user_service.get_user_by_email(user_email)
|
||||
|
||||
if not user:
|
||||
@ -25,6 +21,6 @@ def login_required(func):
|
||||
raise web.HTTPFound('/login')
|
||||
|
||||
# Ensure the user object is available in the request for views
|
||||
request["user"] = user
|
||||
self.request["user"] = user
|
||||
return await func(self, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
@ -11,7 +11,7 @@ from .routes import setup_routes
|
||||
from .services.user_service import UserService
|
||||
from .services.config_service import ConfigService
|
||||
from .services.file_service import FileService # Import FileService
|
||||
from .middlewares import user_middleware, error_middleware,request_hybrid_middleware
|
||||
from .middlewares import user_middleware, error_middleware
|
||||
from .helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key # Import new function
|
||||
|
||||
|
||||
@ -40,7 +40,6 @@ def create_app():
|
||||
|
||||
# The order of middleware registration matters.
|
||||
# They are executed in the order they are added.
|
||||
app.middlewares.append(request_hybrid_middleware)
|
||||
app.middlewares.append(error_middleware)
|
||||
|
||||
# Setup session
|
||||
|
||||
@ -1,10 +1,6 @@
|
||||
from aiohttp import web
|
||||
from aiohttp_session import get_session
|
||||
|
||||
@web.middleware
|
||||
async def request_hybrid_middleware(request, handler):
|
||||
setattr(request,'request', request)
|
||||
return await handler(request)
|
||||
|
||||
@web.middleware
|
||||
async def user_middleware(request, handler):
|
||||
@ -24,4 +20,3 @@ async def error_middleware(request, handler):
|
||||
raise # Re-raise HTTPException to see original traceback
|
||||
except Exception:
|
||||
raise # Re-raise generic Exception to see original traceback
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
from .views.auth import LoginView, RegistrationView, LogoutView, ForgotPasswordView, ResetPasswordView
|
||||
from .views.site import SiteView, OrderView, FileBrowserView, UserManagementView
|
||||
from .views.upload import UploadView
|
||||
from .views.admin import get_users, add_user, update_user_quota, delete_user, get_user_details, delete_team
|
||||
|
||||
|
||||
@ -31,14 +30,10 @@ def setup_routes(app):
|
||||
app.router.add_post("/users/{email}/delete", UserManagementView, name="delete_user_page")
|
||||
app.router.add_view("/files", FileBrowserView, name="file_browser")
|
||||
app.router.add_post("/files/new_folder", FileBrowserView, name="new_folder")
|
||||
app.router.add_post("/files/upload", UploadView, name="upload_file")
|
||||
app.router.add_get("/files/download/{file_path:.*}", FileBrowserView.get_download_file, name="download_file")
|
||||
app.router.add_post("/files/upload", FileBrowserView, name="upload_file")
|
||||
app.router.add_get("/files/download/{file_path:.*}", FileBrowserView, name="download_file")
|
||||
app.router.add_post("/files/share/{file_path:.*}", FileBrowserView, name="share_file")
|
||||
app.router.add_post("/files/delete/{file_path:.*}", FileBrowserView, name="delete_item")
|
||||
app.router.add_post("/files/delete_multiple", FileBrowserView, name="delete_multiple_items")
|
||||
app.router.add_post("/files/share_multiple", FileBrowserView, name="share_multiple_items")
|
||||
app.router.add_get("/shared_file/{share_id}", FileBrowserView.shared_file_handler, name="shared_file")
|
||||
app.router.add_get("/shared_file/{share_id}/download", FileBrowserView.download_shared_file_handler, name="download_shared_file")
|
||||
|
||||
# Admin API routes for user and team management
|
||||
app.router.add_get("/api/users", get_users, name="api_get_users")
|
||||
|
||||
@ -12,11 +12,8 @@ class ConfigService:
|
||||
def _load_config(self):
|
||||
config_from_file = {}
|
||||
if self._config_path.exists():
|
||||
try:
|
||||
with open(self._config_path, "r") as f:
|
||||
config_from_file = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
config_from_file = {}
|
||||
|
||||
# Override with environment variables
|
||||
config_from_env = {
|
||||
|
||||
@ -4,53 +4,35 @@ from pathlib import Path
|
||||
import shutil
|
||||
import uuid
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
handler = logging.StreamHandler()
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
class FileService:
|
||||
def __init__(self, base_dir: Path, users_data_path: Path):
|
||||
self.base_dir = base_dir
|
||||
self.users_data_path = users_data_path
|
||||
self.base_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"FileService initialized with base_dir: {self.base_dir} and users_data_path: {self.users_data_path}")
|
||||
|
||||
async def _load_users_data(self):
|
||||
"""Loads user data from the JSON file."""
|
||||
if not self.users_data_path.exists():
|
||||
logger.warning(f"users_data_path does not exist: {self.users_data_path}")
|
||||
return []
|
||||
return {}
|
||||
async with aiofiles.open(self.users_data_path, mode="r") as f:
|
||||
content = await f.read()
|
||||
try:
|
||||
return json.loads(content) if content else []
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"JSONDecodeError when loading users data from {self.users_data_path}")
|
||||
return []
|
||||
return json.loads(content) if content else {}
|
||||
|
||||
async def _save_users_data(self, data):
|
||||
"""Saves user data to the JSON file."""
|
||||
async with aiofiles.open(self.users_data_path, mode="w") as f:
|
||||
await f.write(json.dumps(data, indent=4))
|
||||
logger.debug(f"Saved users data to {self.users_data_path}")
|
||||
|
||||
def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
|
||||
"""Constructs the absolute path for a user's file or directory."""
|
||||
user_dir = self.base_dir / user_email
|
||||
full_path = user_dir / relative_path
|
||||
logger.debug(f"Constructed path for user '{user_email}', relative_path '{relative_path}': {full_path}")
|
||||
return full_path
|
||||
return user_dir / relative_path
|
||||
|
||||
async def list_files(self, user_email: str, path: str = "") -> list:
|
||||
"""Lists files and directories for a given user within a specified path."""
|
||||
user_path = self._get_user_file_path(user_email, path)
|
||||
if not user_path.is_dir():
|
||||
logger.warning(f"list_files: User path is not a directory or does not exist: {user_path}")
|
||||
return []
|
||||
|
||||
files_list = []
|
||||
@ -65,17 +47,14 @@ class FileService:
|
||||
"last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
|
||||
}
|
||||
files_list.append(file_info)
|
||||
logger.debug(f"Listed {len(files_list)} items for user '{user_email}' in path '{path}'")
|
||||
return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
|
||||
|
||||
async def create_folder(self, user_email: str, folder_path: str) -> bool:
|
||||
"""Creates a new folder for the user."""
|
||||
full_path = self._get_user_file_path(user_email, folder_path)
|
||||
if full_path.exists():
|
||||
logger.warning(f"create_folder: Folder already exists: {full_path}")
|
||||
return False # Folder already exists
|
||||
full_path.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"create_folder: Folder created: {full_path}")
|
||||
return True
|
||||
|
||||
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
|
||||
@ -84,49 +63,38 @@ class FileService:
|
||||
full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
|
||||
async with aiofiles.open(full_path, mode="wb") as f:
|
||||
await f.write(content)
|
||||
logger.info(f"upload_file: File uploaded to: {full_path}")
|
||||
return True
|
||||
|
||||
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
|
||||
"""Downloads a file for the user."""
|
||||
full_path = self._get_user_file_path(user_email, file_path)
|
||||
logger.debug(f"download_file: Attempting to download file from: {full_path}")
|
||||
if full_path.is_file():
|
||||
async with aiofiles.open(full_path, mode="rb") as f:
|
||||
content = await f.read()
|
||||
logger.info(f"download_file: Successfully read file: {full_path}")
|
||||
return content, full_path.name
|
||||
logger.warning(f"download_file: File not found or is not a file: {full_path}")
|
||||
return None
|
||||
|
||||
async def delete_item(self, user_email: str, item_path: str) -> bool:
|
||||
"""Deletes a file or folder for the user."""
|
||||
full_path = self._get_user_file_path(user_email, item_path)
|
||||
logger.debug(f"delete_item: Attempting to delete item: {full_path}")
|
||||
if not full_path.exists():
|
||||
logger.warning(f"delete_item: Item does not exist: {full_path}")
|
||||
return False
|
||||
|
||||
if full_path.is_file():
|
||||
full_path.unlink()
|
||||
logger.info(f"delete_item: File deleted: {full_path}")
|
||||
elif full_path.is_dir():
|
||||
shutil.rmtree(full_path)
|
||||
logger.info(f"delete_item: Directory deleted: {full_path}")
|
||||
return True
|
||||
|
||||
async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
|
||||
"""Generates a shareable link for a file or folder."""
|
||||
logger.debug(f"generate_share_link: Generating link for user '{user_email}', item '{item_path}'")
|
||||
users_data = await self._load_users_data()
|
||||
user = next((u for u in users_data if u.get("email") == user_email), None)
|
||||
user = users_data.get(user_email)
|
||||
if not user:
|
||||
logger.warning(f"generate_share_link: User not found: {user_email}")
|
||||
return None
|
||||
|
||||
full_path = self._get_user_file_path(user_email, item_path)
|
||||
if not full_path.exists():
|
||||
logger.warning(f"generate_share_link: Item does not exist: {full_path}")
|
||||
return None
|
||||
|
||||
share_id = str(uuid.uuid4())
|
||||
@ -139,60 +107,37 @@ class FileService:
|
||||
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
|
||||
}
|
||||
await self._save_users_data(users_data)
|
||||
logger.info(f"generate_share_link: Share link generated with ID: {share_id} for item: {item_path}")
|
||||
return share_id
|
||||
|
||||
async def get_shared_item(self, share_id: str) -> dict | None:
|
||||
"""Retrieves information about a shared item."""
|
||||
logger.debug(f"get_shared_item: Retrieving shared item with ID: {share_id}")
|
||||
users_data = await self._load_users_data()
|
||||
for user_info in users_data:
|
||||
for user_email, user_info in users_data.items():
|
||||
if "shared_items" in user_info and share_id in user_info["shared_items"]:
|
||||
shared_item = user_info["shared_items"][share_id]
|
||||
expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
|
||||
if expiry_time > datetime.datetime.now(datetime.timezone.utc):
|
||||
logger.info(f"get_shared_item: Found valid shared item for ID: {share_id}")
|
||||
return shared_item
|
||||
else:
|
||||
logger.warning(f"get_shared_item: Shared item {share_id} has expired.")
|
||||
logger.warning(f"get_shared_item: No valid shared item found for ID: {share_id}")
|
||||
return None
|
||||
|
||||
async def get_shared_file_content(self, share_id: str, requested_file_path: str | None = None) -> tuple[bytes, str] | None:
|
||||
async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None:
|
||||
"""Retrieves the content of a shared file."""
|
||||
logger.debug(f"get_shared_file_content: Retrieving content for shared file with ID: {share_id}, requested_file_path: {requested_file_path}")
|
||||
shared_item = await self.get_shared_item(share_id)
|
||||
if not shared_item:
|
||||
return None
|
||||
|
||||
user_email = shared_item["user_email"]
|
||||
item_path = shared_item["item_path"] # This is the path of the originally shared item (file or folder)
|
||||
item_path = shared_item["item_path"]
|
||||
full_path = self._get_user_file_path(user_email, item_path)
|
||||
|
||||
# Construct the full path to the originally shared item
|
||||
full_shared_item_path = self._get_user_file_path(user_email, item_path)
|
||||
|
||||
target_file_path = full_shared_item_path
|
||||
if requested_file_path:
|
||||
# If a specific file within a shared folder is requested
|
||||
target_file_path = self._get_user_file_path(user_email, requested_file_path)
|
||||
# Security check: Ensure the requested file is actually within the shared item's directory
|
||||
try:
|
||||
target_file_path.relative_to(full_shared_item_path)
|
||||
except ValueError:
|
||||
logger.warning(f"get_shared_file_content: Requested file path '{requested_file_path}' is not within shared item path '{item_path}' for share_id: {share_id}")
|
||||
return None
|
||||
|
||||
if target_file_path.is_file():
|
||||
async with aiofiles.open(target_file_path, mode="rb") as f:
|
||||
if full_path.is_file():
|
||||
async with aiofiles.open(full_path, mode="rb") as f:
|
||||
content = await f.read()
|
||||
logger.info(f"get_shared_file_content: Successfully read content for shared file: {target_file_path}")
|
||||
return content, target_file_path.name
|
||||
logger.warning(f"get_shared_file_content: Shared item path is not a file or does not exist: {target_file_path}")
|
||||
return content, full_path.name
|
||||
return None
|
||||
|
||||
async def get_shared_folder_content(self, share_id: str) -> list | None:
|
||||
"""Retrieves the content of a shared folder."""
|
||||
logger.debug(f"get_shared_folder_content: Retrieving content for shared folder with ID: {share_id}")
|
||||
shared_item = await self.get_shared_item(share_id)
|
||||
if not shared_item:
|
||||
return None
|
||||
@ -202,7 +147,5 @@ class FileService:
|
||||
full_path = self._get_user_file_path(user_email, item_path)
|
||||
|
||||
if full_path.is_dir():
|
||||
logger.info(f"get_shared_folder_content: Listing files for shared folder: {full_path}")
|
||||
return await self.list_files(user_email, item_path)
|
||||
logger.warning(f"get_shared_folder_content: Shared item path is not a directory or does not exist: {full_path}")
|
||||
return None
|
||||
|
||||
@ -14,11 +14,8 @@ class UserService:
|
||||
def _load_users(self) -> List[Dict[str, Any]]:
|
||||
if not self._users_path.exists():
|
||||
return []
|
||||
try:
|
||||
with open(self._users_path, "r") as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
|
||||
def _save_users(self):
|
||||
with open(self._users_path, "w") as f:
|
||||
|
||||
@ -65,8 +65,8 @@
|
||||
}
|
||||
|
||||
.btn-small {
|
||||
padding: 0.2rem 0.4rem; /* Reduced padding */
|
||||
font-size: 0.75rem; /* Reduced font size */
|
||||
padding: 0.4rem 0.8rem;
|
||||
font-size: 0.85rem;
|
||||
border: 1px solid var(--border-color);
|
||||
border-radius: 4px;
|
||||
background-color: var(--background-color);
|
||||
@ -236,107 +236,3 @@
|
||||
min-width: 120px;
|
||||
}
|
||||
}
|
||||
|
||||
/* Styles for the new upload functionality */
|
||||
.upload-area {
|
||||
border: 2px dashed var(--border-color);
|
||||
border-radius: 8px;
|
||||
padding: 20px;
|
||||
text-align: center;
|
||||
margin-bottom: 20px;
|
||||
background-color: var(--background-color);
|
||||
}
|
||||
|
||||
.upload-button {
|
||||
display: inline-block;
|
||||
padding: 10px 20px;
|
||||
cursor: pointer;
|
||||
margin-top: 10px;
|
||||
}
|
||||
|
||||
.selected-files-preview {
|
||||
margin-top: 20px;
|
||||
text-align: left;
|
||||
max-height: 200px;
|
||||
overflow-y: auto;
|
||||
border-top: 1px solid var(--border-color);
|
||||
padding-top: 10px;
|
||||
}
|
||||
|
||||
.file-entry {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
padding: 8px 0;
|
||||
border-bottom: 1px solid var(--border-color);
|
||||
}
|
||||
|
||||
.file-entry:last-child {
|
||||
border-bottom: none;
|
||||
}
|
||||
|
||||
.file-entry .file-name {
|
||||
flex-grow: 1;
|
||||
margin-right: 10px;
|
||||
color: var(--text-color);
|
||||
}
|
||||
|
||||
.file-entry .file-size {
|
||||
color: var(--light-text-color);
|
||||
font-size: 0.85rem;
|
||||
}
|
||||
|
||||
.file-entry .thumbnail-preview {
|
||||
width: 40px;
|
||||
height: 40px;
|
||||
margin-left: 10px;
|
||||
border-radius: 4px;
|
||||
overflow: hidden;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
background-color: var(--background-color);
|
||||
border: 1px solid var(--border-color);
|
||||
}
|
||||
|
||||
.file-entry .thumbnail-preview img {
|
||||
max-width: 100%;
|
||||
max-height: 100%;
|
||||
display: block;
|
||||
object-fit: contain;
|
||||
}
|
||||
|
||||
.progress-bar-container {
|
||||
margin-top: 15px;
|
||||
text-align: left;
|
||||
border-top: 1px solid var(--border-color);
|
||||
padding-top: 10px;
|
||||
}
|
||||
|
||||
.progress-bar-container .file-name {
|
||||
font-weight: bold;
|
||||
margin-bottom: 5px;
|
||||
color: var(--text-color);
|
||||
}
|
||||
|
||||
.progress-bar-wrapper {
|
||||
width: 100%;
|
||||
background-color: var(--background-color);
|
||||
border-radius: 5px;
|
||||
overflow: hidden;
|
||||
height: 10px;
|
||||
margin-bottom: 5px;
|
||||
}
|
||||
|
||||
.progress-bar {
|
||||
height: 100%;
|
||||
width: 0%;
|
||||
background-color: var(--accent-color);
|
||||
border-radius: 5px;
|
||||
transition: width 0.3s ease-in-out;
|
||||
}
|
||||
|
||||
.progress-text {
|
||||
font-size: 0.85rem;
|
||||
color: var(--light-text-color);
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
@ -1,116 +0,0 @@
|
||||
export function showUploadModal() {
|
||||
document.getElementById('upload-modal').style.display = 'block';
|
||||
// Clear previous selections and progress
|
||||
document.getElementById('selected-files-preview').innerHTML = '';
|
||||
document.getElementById('upload-progress-container').innerHTML = '';
|
||||
document.getElementById('file-input-multiple').value = ''; // Clear selected files
|
||||
document.getElementById('start-upload-btn').disabled = true;
|
||||
}
|
||||
|
||||
document.addEventListener('DOMContentLoaded', () => {
|
||||
const fileInput = document.getElementById('file-input-multiple');
|
||||
const selectedFilesPreview = document.getElementById('selected-files-preview');
|
||||
const startUploadBtn = document.getElementById('start-upload-btn');
|
||||
const uploadProgressContainer = document.getElementById('upload-progress-container');
|
||||
|
||||
let filesToUpload = [];
|
||||
|
||||
fileInput.addEventListener('change', (event) => {
|
||||
filesToUpload = Array.from(event.target.files);
|
||||
selectedFilesPreview.innerHTML = ''; // Clear previous previews
|
||||
uploadProgressContainer.innerHTML = ''; // Clear previous progress bars
|
||||
|
||||
if (filesToUpload.length > 0) {
|
||||
startUploadBtn.disabled = false;
|
||||
filesToUpload.forEach(file => {
|
||||
const fileEntry = document.createElement('div');
|
||||
fileEntry.className = 'file-entry';
|
||||
fileEntry.innerHTML = `
|
||||
<span class="file-name">${file.name}</span>
|
||||
<span class="file-size">(${(file.size / 1024 / 1024).toFixed(2)} MB)</span>
|
||||
<div class="thumbnail-preview"></div>
|
||||
`;
|
||||
selectedFilesPreview.appendChild(fileEntry);
|
||||
|
||||
// Display thumbnail for image files
|
||||
if (file.type.startsWith('image/')) {
|
||||
const reader = new FileReader();
|
||||
reader.onload = (e) => {
|
||||
const img = document.createElement('img');
|
||||
img.src = e.target.result;
|
||||
fileEntry.querySelector('.thumbnail-preview').appendChild(img);
|
||||
};
|
||||
reader.readAsDataURL(file);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
startUploadBtn.disabled = true;
|
||||
}
|
||||
});
|
||||
|
||||
startUploadBtn.addEventListener('click', () => {
|
||||
if (filesToUpload.length > 0) {
|
||||
uploadFiles(filesToUpload);
|
||||
}
|
||||
});
|
||||
|
||||
async function uploadFiles(files) {
|
||||
startUploadBtn.disabled = true; // Disable button during upload
|
||||
uploadProgressContainer.innerHTML = ''; // Clear previous progress
|
||||
|
||||
const currentPath = new URLSearchParams(window.location.search).get('path') || '';
|
||||
|
||||
for (const file of files) {
|
||||
const formData = new FormData();
|
||||
formData.append('file', file);
|
||||
|
||||
const progressBarContainer = document.createElement('div');
|
||||
progressBarContainer.className = 'progress-bar-container';
|
||||
progressBarContainer.innerHTML = `
|
||||
<div class="file-name">${file.name}</div>
|
||||
<div class="progress-bar-wrapper">
|
||||
<div class="progress-bar" id="progress-${file.name.replace(/\./g, '-')}" style="width: 0%;"></div>
|
||||
</div>
|
||||
<div class="progress-text" id="progress-text-${file.name.replace(/\./g, '-')}" >0%</div>
|
||||
`;
|
||||
uploadProgressContainer.appendChild(progressBarContainer);
|
||||
|
||||
const xhr = new XMLHttpRequest();
|
||||
xhr.open('POST', `/files/upload?current_path=${encodeURIComponent(currentPath)}`, true);
|
||||
|
||||
xhr.upload.addEventListener('progress', (event) => {
|
||||
if (event.lengthComputable) {
|
||||
const percent = (event.loaded / event.total) * 100;
|
||||
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `${percent}%`;
|
||||
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `${Math.round(percent)}%`;
|
||||
}
|
||||
});
|
||||
|
||||
xhr.addEventListener('load', () => {
|
||||
if (xhr.status === 200) {
|
||||
console.log(`File ${file.name} uploaded successfully.`);
|
||||
// Update progress to 100% on completion
|
||||
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `100%`;
|
||||
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `100% (Done)`;
|
||||
} else {
|
||||
console.error(`Error uploading ${file.name}: ${xhr.statusText}`);
|
||||
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Failed (${xhr.status})`;
|
||||
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
|
||||
}
|
||||
});
|
||||
|
||||
xhr.addEventListener('error', () => {
|
||||
console.error(`Network error uploading ${file.name}.`);
|
||||
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Network Error`;
|
||||
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
|
||||
});
|
||||
|
||||
xhr.send(formData);
|
||||
}
|
||||
// After all files are sent, refresh the page to show new files
|
||||
// A small delay to allow server to process and update file list
|
||||
setTimeout(() => {
|
||||
window.location.reload();
|
||||
}, 1000);
|
||||
}
|
||||
});
|
||||
@ -1,6 +1,5 @@
|
||||
import './components/slider.js';
|
||||
import './components/navigation.js'; // Assuming navigation.js might be needed globally
|
||||
import { showUploadModal } from './components/upload.js'; // Import showUploadModal
|
||||
|
||||
document.addEventListener('DOMContentLoaded', () => {
|
||||
// Logic for custom-slider on order page
|
||||
@ -83,262 +82,4 @@ document.addEventListener('DOMContentLoaded', () => {
|
||||
// Initial display based on active button (default to monthly)
|
||||
updatePricingDisplay('monthly');
|
||||
}
|
||||
|
||||
// --- File Browser Specific Logic ---
|
||||
|
||||
// Helper functions for modals
|
||||
function showNewFolderModal() {
|
||||
document.getElementById('new-folder-modal').style.display = 'block';
|
||||
}
|
||||
|
||||
function closeModal(modalId) {
|
||||
document.getElementById(modalId).style.display = 'none';
|
||||
}
|
||||
window.closeModal = closeModal; // Make it globally accessible
|
||||
|
||||
window.onclick = function(event) {
|
||||
if (event.target.classList.contains('modal')) {
|
||||
event.target.style.display = 'none';
|
||||
}
|
||||
}
|
||||
|
||||
// File actions
|
||||
function downloadFile(path) {
|
||||
window.location.href = `/files/download/${path}`;
|
||||
}
|
||||
|
||||
async function shareFile(paths, names) {
|
||||
const modal = document.getElementById('share-modal');
|
||||
const linkContainer = document.getElementById('share-link-container');
|
||||
const loading = document.getElementById('share-loading');
|
||||
const shareLinkInput = document.getElementById('share-link-input');
|
||||
const shareFileName = document.getElementById('share-file-name');
|
||||
const shareLinksList = document.getElementById('share-links-list'); // New element for multiple links
|
||||
|
||||
// Clear previous content
|
||||
shareLinkInput.value = '';
|
||||
if (shareLinksList) shareLinksList.innerHTML = '';
|
||||
linkContainer.style.display = 'none';
|
||||
loading.style.display = 'block';
|
||||
modal.style.display = 'block';
|
||||
|
||||
if (paths.length === 1) {
|
||||
shareFileName.textContent = `Sharing: ${names[0]}`;
|
||||
} else {
|
||||
shareFileName.textContent = `Sharing ${paths.length} items`;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(`/files/share_multiple`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: JSON.stringify({ paths: paths })
|
||||
});
|
||||
const data = await response.json();
|
||||
|
||||
if (data.share_links && data.share_links.length > 0) {
|
||||
if (data.share_links.length === 1) {
|
||||
shareLinkInput.value = data.share_links[0];
|
||||
linkContainer.style.display = 'block';
|
||||
} else {
|
||||
// Display multiple links
|
||||
if (!shareLinksList) {
|
||||
// Create the list if it doesn't exist
|
||||
const ul = document.createElement('ul');
|
||||
ul.id = 'share-links-list';
|
||||
linkContainer.appendChild(ul);
|
||||
shareLinksList = ul;
|
||||
}
|
||||
data.share_links.forEach(item => {
|
||||
const li = document.createElement('li');
|
||||
li.innerHTML = `<strong>${item.name}:</strong> <input type="text" value="${item.link}" readonly class="form-input share-link-item-input"> <button class="btn-primary copy-share-link-item-btn" data-link="${item.link}">Copy</button>`;
|
||||
shareLinksList.appendChild(li);
|
||||
});
|
||||
linkContainer.style.display = 'block';
|
||||
}
|
||||
loading.style.display = 'none';
|
||||
} else {
|
||||
loading.textContent = 'Error generating share link(s)';
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error sharing files:', error);
|
||||
loading.textContent = 'Error generating share link(s)';
|
||||
}
|
||||
}
|
||||
|
||||
function copyShareLink() {
|
||||
const input = document.getElementById('share-link-input');
|
||||
input.select();
|
||||
document.execCommand('copy');
|
||||
alert('Share link copied to clipboard');
|
||||
}
|
||||
|
||||
function deleteFile(paths, names) {
|
||||
const deleteForm = document.getElementById('delete-form');
|
||||
const deleteMessage = document.getElementById('delete-message');
|
||||
const deleteModal = document.getElementById('delete-modal');
|
||||
|
||||
// Clear previous hidden inputs
|
||||
deleteForm.querySelectorAll('input[name="paths[]"]').forEach(input => input.remove());
|
||||
|
||||
if (Array.isArray(paths) && paths.length > 1) {
|
||||
deleteMessage.textContent = `Are you sure you want to delete ${paths.length} items? This action cannot be undone.`;
|
||||
deleteForm.action = `/files/delete_multiple`;
|
||||
paths.forEach(path => {
|
||||
const input = document.createElement('input');
|
||||
input.type = 'hidden';
|
||||
input.name = 'paths[]';
|
||||
input.value = path;
|
||||
deleteForm.appendChild(input);
|
||||
});
|
||||
} else {
|
||||
const path = Array.isArray(paths) ? paths[0] : paths;
|
||||
const name = Array.isArray(names) ? names[0] : names;
|
||||
deleteMessage.textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
|
||||
deleteForm.action = `/files/delete/${path}`;
|
||||
}
|
||||
deleteModal.style.display = 'block';
|
||||
}
|
||||
|
||||
// Selection and action buttons
|
||||
function updateActionButtons() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
const downloadBtn = document.getElementById('download-selected-btn');
|
||||
const shareBtn = document.getElementById('share-selected-btn');
|
||||
const deleteBtn = document.getElementById('delete-selected-btn');
|
||||
|
||||
const hasSelection = checked.length > 0;
|
||||
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
|
||||
|
||||
if (downloadBtn) downloadBtn.disabled = !hasFiles;
|
||||
if (shareBtn) shareBtn.disabled = !hasSelection;
|
||||
if (deleteBtn) deleteBtn.disabled = !hasSelection;
|
||||
}
|
||||
|
||||
function downloadSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
|
||||
downloadFile(checked[0].dataset.path);
|
||||
}
|
||||
}
|
||||
|
||||
function shareSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length > 0) {
|
||||
const paths = Array.from(checked).map(cb => cb.dataset.path);
|
||||
const names = Array.from(checked).map(cb =>
|
||||
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
|
||||
);
|
||||
shareFile(paths, names);
|
||||
}
|
||||
}
|
||||
|
||||
function deleteSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length > 0) {
|
||||
const paths = Array.from(checked).map(cb => cb.dataset.path);
|
||||
const names = Array.from(checked).map(cb =>
|
||||
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
|
||||
);
|
||||
deleteFile(paths, names);
|
||||
}
|
||||
}
|
||||
|
||||
// Event Listeners for File Browser
|
||||
const newFolderBtn = document.getElementById('new-folder-btn');
|
||||
if (newFolderBtn) {
|
||||
console.log('Attaching event listener to new-folder-btn');
|
||||
newFolderBtn.addEventListener('click', showNewFolderModal);
|
||||
}
|
||||
|
||||
const uploadBtn = document.getElementById('upload-btn');
|
||||
if (uploadBtn) {
|
||||
console.log('Attaching event listener to upload-btn');
|
||||
uploadBtn.addEventListener('click', showUploadModal);
|
||||
}
|
||||
|
||||
const createFirstFolderBtn = document.getElementById('create-first-folder-btn');
|
||||
if (createFirstFolderBtn) {
|
||||
console.log('Attaching event listener to create-first-folder-btn');
|
||||
createFirstFolderBtn.addEventListener('click', showNewFolderModal);
|
||||
}
|
||||
|
||||
const uploadFirstFileBtn = document.getElementById('upload-first-file-btn');
|
||||
if (uploadFirstFileBtn) {
|
||||
console.log('Attaching event listener to upload-first-file-btn');
|
||||
uploadFirstFileBtn.addEventListener('click', showUploadModal);
|
||||
}
|
||||
|
||||
const downloadSelectedBtn = document.getElementById('download-selected-btn');
|
||||
if (downloadSelectedBtn) {
|
||||
downloadSelectedBtn.addEventListener('click', downloadSelected);
|
||||
}
|
||||
|
||||
const shareSelectedBtn = document.getElementById('share-selected-btn');
|
||||
if (shareSelectedBtn) {
|
||||
shareSelectedBtn.addEventListener('click', shareSelected);
|
||||
}
|
||||
|
||||
const deleteSelectedBtn = document.getElementById('delete-selected-btn');
|
||||
if (deleteSelectedBtn) {
|
||||
deleteSelectedBtn.addEventListener('click', deleteSelected);
|
||||
}
|
||||
|
||||
const copyShareLinkBtn = document.getElementById('copy-share-link-btn');
|
||||
if (copyShareLinkBtn) {
|
||||
copyShareLinkBtn.addEventListener('click', copyShareLink);
|
||||
}
|
||||
|
||||
document.getElementById('select-all')?.addEventListener('change', function(e) {
|
||||
const checkboxes = document.querySelectorAll('.file-checkbox');
|
||||
checkboxes.forEach(cb => cb.checked = e.target.checked);
|
||||
updateActionButtons();
|
||||
});
|
||||
|
||||
document.querySelectorAll('.file-checkbox').forEach(cb => {
|
||||
cb.addEventListener('change', updateActionButtons);
|
||||
});
|
||||
|
||||
// Event listeners for dynamically created download/share/delete buttons
|
||||
document.addEventListener('click', (event) => {
|
||||
if (event.target.classList.contains('download-file-btn')) {
|
||||
downloadFile(event.target.dataset.path);
|
||||
} else if (event.target.classList.contains('share-file-btn')) {
|
||||
const path = event.target.dataset.path;
|
||||
const name = event.target.dataset.name;
|
||||
shareFile([path], [name]);
|
||||
} else if (event.target.classList.contains('delete-file-btn')) {
|
||||
const path = event.target.dataset.path;
|
||||
const name = event.target.dataset.name;
|
||||
deleteFile([path], [name]);
|
||||
} else if (event.target.classList.contains('copy-share-link-item-btn')) {
|
||||
const linkToCopy = event.target.dataset.link;
|
||||
navigator.clipboard.writeText(linkToCopy).then(() => {
|
||||
alert('Share link copied to clipboard');
|
||||
}).catch(err => {
|
||||
console.error('Failed to copy link: ', err);
|
||||
alert('Failed to copy link');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
document.getElementById('search-bar')?.addEventListener('input', function(e) {
|
||||
const searchTerm = e.target.value.toLowerCase();
|
||||
const rows = document.querySelectorAll('#file-list-body tr');
|
||||
|
||||
rows.forEach(row => {
|
||||
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
|
||||
if (name && name.includes(searchTerm)) {
|
||||
row.style.display = '';
|
||||
} else {
|
||||
row.style.display = 'none';
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Initial update of action buttons
|
||||
updateActionButtons();
|
||||
});
|
||||
@ -9,11 +9,11 @@
|
||||
{% block page_title %}My Files{% endblock %}
|
||||
|
||||
{% block dashboard_actions %}
|
||||
<button class="btn-primary" id="new-folder-btn">+ New</button>
|
||||
<button class="btn-outline" id="upload-btn">Upload</button>
|
||||
<button class="btn-outline" id="download-selected-btn" disabled>⬇️</button>
|
||||
<button class="btn-outline" id="share-selected-btn" disabled>đź”—</button>
|
||||
<button class="btn-outline" id="delete-selected-btn" disabled>Delete</button>
|
||||
<button class="btn-primary" onclick="showNewFolderModal()">+ New</button>
|
||||
<button class="btn-outline" onclick="showUploadModal()">Upload</button>
|
||||
<button class="btn-outline" onclick="downloadSelected()" id="download-btn" disabled>Download</button>
|
||||
<button class="btn-outline" onclick="shareSelected()" id="share-btn" disabled>Share</button>
|
||||
<button class="btn-outline" onclick="deleteSelected()" id="delete-btn" disabled>Delete</button>
|
||||
{% endblock %}
|
||||
|
||||
{% block dashboard_content %}
|
||||
@ -70,10 +70,10 @@
|
||||
<td>
|
||||
<div class="action-buttons">
|
||||
{% if not item.is_dir %}
|
||||
<button class="btn-small download-file-btn" data-path="{{ item.path }}">⬇️</button>
|
||||
<button class="btn-small" onclick="downloadFile('{{ item.path }}')">Download</button>
|
||||
{% endif %}
|
||||
<button class="btn-small share-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">đź”—</button>
|
||||
<button class="btn-small btn-danger delete-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">Delete</button>
|
||||
<button class="btn-small" onclick="shareFile('{{ item.path }}', '{{ item.name }}')">Share</button>
|
||||
<button class="btn-small btn-danger" onclick="deleteFile('{{ item.path }}', '{{ item.name }}')">Delete</button>
|
||||
</div>
|
||||
</td>
|
||||
</tr>
|
||||
@ -82,8 +82,8 @@
|
||||
<tr>
|
||||
<td colspan="6" style="text-align: center; padding: 40px;">
|
||||
<p>No files found in this directory.</p>
|
||||
<button class="btn-primary" id="create-first-folder-btn" style="margin-top: 10px;">Create your first folder</button>
|
||||
<button class="btn-outline" id="upload-first-file-btn" style="margin-top: 10px;">Upload a file</button>
|
||||
<button class="btn-primary" onclick="showNewFolderModal()" style="margin-top: 10px;">Create your first folder</button>
|
||||
<button class="btn-outline" onclick="showUploadModal()" style="margin-top: 10px;">Upload a file</button>
|
||||
</td>
|
||||
</tr>
|
||||
{% endif %}
|
||||
@ -108,17 +108,15 @@
|
||||
<div id="upload-modal" class="modal">
|
||||
<div class="modal-content">
|
||||
<span class="close" onclick="closeModal('upload-modal')">×</span>
|
||||
<h3>Upload Files</h3>
|
||||
<div class="upload-area">
|
||||
<input type="file" name="file" multiple class="form-input" id="file-input-multiple" style="display: none;">
|
||||
<label for="file-input-multiple" class="btn-outline upload-button">Select Files</label>
|
||||
<div id="selected-files-preview" class="selected-files-preview"></div>
|
||||
<div id="upload-progress-container" class="upload-progress-container"></div>
|
||||
</div>
|
||||
<h3>Upload File</h3>
|
||||
<form action="/files/upload" method="post" enctype="multipart/form-data">
|
||||
<input type="file" name="file" required class="form-input" id="file-input">
|
||||
<div class="file-info" id="file-info"></div>
|
||||
<div class="modal-actions">
|
||||
<button type="button" class="btn-primary" id="start-upload-btn" disabled>Upload</button>
|
||||
<button type="submit" class="btn-primary">Upload</button>
|
||||
<button type="button" class="btn-outline" onclick="closeModal('upload-modal')">Cancel</button>
|
||||
</div>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@ -129,8 +127,7 @@
|
||||
<p id="share-file-name"></p>
|
||||
<div id="share-link-container" style="display: none;">
|
||||
<input type="text" id="share-link-input" readonly class="form-input">
|
||||
<button class="btn-primary" id="copy-share-link-btn">Copy Link</button>
|
||||
<div id="share-links-list" class="share-links-list"></div>
|
||||
<button class="btn-primary" onclick="copyShareLink()">Copy Link</button>
|
||||
</div>
|
||||
<div id="share-loading">Generating share link...</div>
|
||||
<div class="modal-actions">
|
||||
@ -153,6 +150,146 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script type="module" src="/static/js/components/upload.js"></script>
|
||||
<script type="module" src="/static/js/main.js"></script>
|
||||
<script>
|
||||
function showNewFolderModal() {
|
||||
document.getElementById('new-folder-modal').style.display = 'block';
|
||||
}
|
||||
|
||||
function showUploadModal() {
|
||||
document.getElementById('upload-modal').style.display = 'block';
|
||||
}
|
||||
|
||||
function closeModal(modalId) {
|
||||
document.getElementById(modalId).style.display = 'none';
|
||||
}
|
||||
|
||||
window.onclick = function(event) {
|
||||
if (event.target.classList.contains('modal')) {
|
||||
event.target.style.display = 'none';
|
||||
}
|
||||
}
|
||||
|
||||
document.getElementById('file-input').addEventListener('change', function(e) {
|
||||
const file = e.target.files[0];
|
||||
if (file) {
|
||||
const size = (file.size / 1024 / 1024).toFixed(2);
|
||||
document.getElementById('file-info').innerHTML = `Selected: ${file.name} (${size} MB)`;
|
||||
}
|
||||
});
|
||||
|
||||
function downloadFile(path) {
|
||||
window.location.href = `/files/download/${path}`;
|
||||
}
|
||||
|
||||
async function shareFile(path, name) {
|
||||
const modal = document.getElementById('share-modal');
|
||||
const linkContainer = document.getElementById('share-link-container');
|
||||
const loading = document.getElementById('share-loading');
|
||||
|
||||
document.getElementById('share-file-name').textContent = `Sharing: ${name}`;
|
||||
linkContainer.style.display = 'none';
|
||||
loading.style.display = 'block';
|
||||
modal.style.display = 'block';
|
||||
|
||||
try {
|
||||
const response = await fetch(`/files/share/${path}`, {
|
||||
method: 'POST'
|
||||
});
|
||||
const data = await response.json();
|
||||
|
||||
if (data.share_link) {
|
||||
document.getElementById('share-link-input').value = data.share_link;
|
||||
linkContainer.style.display = 'block';
|
||||
loading.style.display = 'none';
|
||||
}
|
||||
} catch (error) {
|
||||
loading.textContent = 'Error generating share link';
|
||||
}
|
||||
}
|
||||
|
||||
function copyShareLink() {
|
||||
const input = document.getElementById('share-link-input');
|
||||
input.select();
|
||||
document.execCommand('copy');
|
||||
alert('Share link copied to clipboard');
|
||||
}
|
||||
|
||||
function deleteFile(path, name) {
|
||||
document.getElementById('delete-message').textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
|
||||
document.getElementById('delete-form').action = `/files/delete/${path}`;
|
||||
document.getElementById('delete-modal').style.display = 'block';
|
||||
}
|
||||
|
||||
document.getElementById('select-all').addEventListener('change', function(e) {
|
||||
const checkboxes = document.querySelectorAll('.file-checkbox');
|
||||
checkboxes.forEach(cb => cb.checked = e.target.checked);
|
||||
updateActionButtons();
|
||||
});
|
||||
|
||||
document.querySelectorAll('.file-checkbox').forEach(cb => {
|
||||
cb.addEventListener('change', updateActionButtons);
|
||||
});
|
||||
|
||||
function updateActionButtons() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
const downloadBtn = document.getElementById('download-btn');
|
||||
const shareBtn = document.getElementById('share-btn');
|
||||
const deleteBtn = document.getElementById('delete-btn');
|
||||
|
||||
const hasSelection = checked.length > 0;
|
||||
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
|
||||
|
||||
downloadBtn.disabled = !hasFiles;
|
||||
shareBtn.disabled = !hasSelection;
|
||||
deleteBtn.disabled = !hasSelection;
|
||||
}
|
||||
|
||||
function downloadSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
|
||||
downloadFile(checked[0].dataset.path);
|
||||
}
|
||||
}
|
||||
|
||||
function shareSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length === 1) {
|
||||
const path = checked[0].dataset.path;
|
||||
const name = checked[0].closest('tr').querySelector('td:nth-child(2)').textContent.trim();
|
||||
shareFile(path, name);
|
||||
}
|
||||
}
|
||||
|
||||
function deleteSelected() {
|
||||
const checked = document.querySelectorAll('.file-checkbox:checked');
|
||||
if (checked.length > 0) {
|
||||
const paths = Array.from(checked).map(cb => cb.dataset.path);
|
||||
const names = Array.from(checked).map(cb =>
|
||||
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
|
||||
);
|
||||
|
||||
if (checked.length === 1) {
|
||||
deleteFile(paths[0], names[0]);
|
||||
} else {
|
||||
document.getElementById('delete-message').textContent =
|
||||
`Are you sure you want to delete ${checked.length} items? This action cannot be undone.`;
|
||||
document.getElementById('delete-modal').style.display = 'block';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
document.getElementById('search-bar').addEventListener('input', function(e) {
|
||||
const searchTerm = e.target.value.toLowerCase();
|
||||
const rows = document.querySelectorAll('#file-list-body tr');
|
||||
|
||||
rows.forEach(row => {
|
||||
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
|
||||
if (name && name.includes(searchTerm)) {
|
||||
row.style.display = '';
|
||||
} else {
|
||||
row.style.display = 'none';
|
||||
}
|
||||
});
|
||||
});
|
||||
</script>
|
||||
{% endblock %}
|
||||
|
||||
@ -1,64 +0,0 @@
|
||||
{% extends "layouts/base.html" %}
|
||||
|
||||
{% block title %}Shared Folder - Retoor's Cloud Solutions{% endblock %}
|
||||
|
||||
{% block head %}
|
||||
<link rel="stylesheet" href="/static/css/components/file_browser.css">
|
||||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="container dashboard-container">
|
||||
<h1 class="page-title">Shared Folder</h1>
|
||||
|
||||
<div class="file-list-table">
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Last Modified</th>
|
||||
<th>Size</th>
|
||||
<th>Actions</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="file-list-body">
|
||||
{% if files %}
|
||||
{% for item in files %}
|
||||
<tr data-path="{{ item.path }}" data-is-dir="{{ item.is_dir }}">
|
||||
<td>
|
||||
{% if item.is_dir %}
|
||||
<img src="/static/images/icon-families.svg" alt="Folder Icon" class="file-icon">
|
||||
<a href="/shared_file/{{ share_id }}?path={{ item.path }}">{{ item.name }}</a>
|
||||
{% else %}
|
||||
<img src="/static/images/icon-professionals.svg" alt="File Icon" class="file-icon">
|
||||
{{ item.name }}
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>{{ item.last_modified[:10] }}</td>
|
||||
<td>
|
||||
{% if item.is_dir %}
|
||||
--
|
||||
{% else %}
|
||||
{{ (item.size / 1024 / 1024)|round(2) }} MB
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
<div class="action-buttons">
|
||||
{% if not item.is_dir %}
|
||||
<a href="/shared_file/{{ share_id }}/download?file_path={{ item.path }}" class="btn-small">⬇️</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<tr>
|
||||
<td colspan="4" style="text-align: center; padding: 40px;">
|
||||
<p>No files found in this shared directory.</p>
|
||||
</td>
|
||||
</tr>
|
||||
{% endif %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
@ -1,4 +1,3 @@
|
||||
import logging
|
||||
from aiohttp import web
|
||||
import aiohttp_jinja2
|
||||
import os
|
||||
@ -8,14 +7,6 @@ from aiohttp.web_response import json_response
|
||||
from ..helpers.auth import login_required
|
||||
from .auth import CustomPydanticView
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
handler = logging.StreamHandler()
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
# PROJECT_DIR is no longer directly used for user files, as FileService manages them
|
||||
# PROJECT_DIR = Path(__file__).parent.parent.parent / "project"
|
||||
|
||||
@ -54,7 +45,7 @@ class SiteView(web.View):
|
||||
|
||||
@login_required
|
||||
async def dashboard(self):
|
||||
raise web.HTTPFound(self.request.app.router["file_browser"].url_for())
|
||||
return web.HTTPFound(self.request.app.router["file_browser"].url_for())
|
||||
|
||||
async def solutions(self):
|
||||
return aiohttp_jinja2.render_template(
|
||||
@ -135,6 +126,9 @@ class SiteView(web.View):
|
||||
class FileBrowserView(web.View):
|
||||
@login_required
|
||||
async def get(self):
|
||||
if self.request.match_info.get("file_path") is not None:
|
||||
return await self.get_download_file()
|
||||
|
||||
user_email = self.request["user"]["email"]
|
||||
file_service = self.request.app["file_service"]
|
||||
|
||||
@ -163,14 +157,12 @@ class FileBrowserView(web.View):
|
||||
user_email = self.request["user"]["email"]
|
||||
file_service = self.request.app["file_service"]
|
||||
route_name = self.request.match_info.route.name
|
||||
logger.debug(f"FileBrowserView: POST request for route: {route_name}")
|
||||
|
||||
if route_name == "new_folder":
|
||||
data = await self.request.post()
|
||||
folder_name = data.get("folder_name")
|
||||
if not folder_name:
|
||||
logger.warning("FileBrowserView: New folder request missing folder_name")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="Folder name is required"
|
||||
)
|
||||
@ -178,42 +170,74 @@ class FileBrowserView(web.View):
|
||||
|
||||
success = await file_service.create_folder(user_email, folder_name)
|
||||
if success:
|
||||
logger.info(f"FileBrowserView: Folder '{folder_name}' created successfully for user {user_email}")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
success=f"Folder '{folder_name}' created successfully"
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to create folder '{folder_name}' for user {user_email}")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error=f"Folder '{folder_name}' already exists or could not be created"
|
||||
)
|
||||
)
|
||||
|
||||
elif route_name == "upload_file":
|
||||
try:
|
||||
reader = await self.request.multipart()
|
||||
field = await reader.next()
|
||||
if not field or field.name != "file":
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="No file selected for upload"
|
||||
)
|
||||
)
|
||||
|
||||
filename = field.filename
|
||||
if not filename:
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="Filename is required"
|
||||
)
|
||||
)
|
||||
|
||||
content = await field.read()
|
||||
success = await file_service.upload_file(user_email, filename, content)
|
||||
if success:
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
success=f"File '{filename}' uploaded successfully"
|
||||
)
|
||||
)
|
||||
else:
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error=f"Failed to upload file '{filename}'"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error=f"Upload error: {str(e)}"
|
||||
)
|
||||
)
|
||||
|
||||
elif route_name == "share_file":
|
||||
file_path = self.request.match_info.get("file_path")
|
||||
logger.debug(f"FileBrowserView: Share file request for path: {file_path} by user {user_email}")
|
||||
if not file_path:
|
||||
logger.warning("FileBrowserView: Share file request missing file_path")
|
||||
return json_response({"error": "File path is required for sharing"}, status=400)
|
||||
|
||||
share_id = await file_service.generate_share_link(user_email, file_path)
|
||||
if share_id:
|
||||
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
|
||||
logger.info(f"FileBrowserView: Share link generated: {share_link} for file: {file_path}")
|
||||
return json_response({"share_link": share_link})
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to generate share link for file: {file_path} by user {user_email}")
|
||||
return json_response({"error": "Failed to generate share link"}, status=500)
|
||||
|
||||
elif route_name == "delete_item":
|
||||
item_path = self.request.match_info.get("file_path")
|
||||
logger.debug(f"FileBrowserView: Delete item request for path: {item_path} by user {user_email}")
|
||||
if not item_path:
|
||||
logger.warning("FileBrowserView: Delete item request missing item_path")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="Item path is required for deletion"
|
||||
)
|
||||
@ -221,199 +245,38 @@ class FileBrowserView(web.View):
|
||||
|
||||
success = await file_service.delete_item(user_email, item_path)
|
||||
if success:
|
||||
logger.info(f"FileBrowserView: Item '{item_path}' deleted successfully for user {user_email}")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
success=f"Item deleted successfully"
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to delete item '{item_path}' for user {user_email}")
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="Failed to delete item - it may not exist"
|
||||
)
|
||||
)
|
||||
|
||||
elif route_name == "delete_multiple_items":
|
||||
data = await self.request.post()
|
||||
paths = data.getall("paths[]", [])
|
||||
logger.debug(f"FileBrowserView: Delete multiple items request for paths: {paths} by user {user_email}")
|
||||
|
||||
if not paths:
|
||||
logger.warning("FileBrowserView: Delete multiple items request missing paths")
|
||||
raise web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="No items selected for deletion"
|
||||
)
|
||||
)
|
||||
|
||||
all_successful = True
|
||||
for path in paths:
|
||||
success = await file_service.delete_item(user_email, path)
|
||||
if not success:
|
||||
all_successful = False
|
||||
logger.error(f"FileBrowserView: Failed to delete item '{path}' for user {user_email} during bulk delete")
|
||||
|
||||
if all_successful:
|
||||
logger.info(f"FileBrowserView: All {len(paths)} items deleted successfully for user {user_email}")
|
||||
raise web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
success=f"Successfully deleted {len(paths)} items"
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Some items failed to delete for user {user_email} during bulk delete")
|
||||
raise web.HTTPFound(
|
||||
self.request.app.router["file_browser"].url_for().with_query(
|
||||
error="Some items failed to delete"
|
||||
)
|
||||
)
|
||||
|
||||
elif route_name == "share_multiple_items":
|
||||
data = await self.request.json()
|
||||
paths = data.get("paths", [])
|
||||
logger.debug(f"FileBrowserView: Share multiple items request for paths: {paths} by user {user_email}")
|
||||
|
||||
if not paths:
|
||||
logger.warning("FileBrowserView: Share multiple items request missing paths")
|
||||
return json_response({"error": "No items selected for sharing"}, status=400)
|
||||
|
||||
share_links = []
|
||||
for path in paths:
|
||||
share_id = await file_service.generate_share_link(user_email, path)
|
||||
if share_id:
|
||||
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
|
||||
# Extract file name from path
|
||||
file_name = os.path.basename(path)
|
||||
share_links.append({"name": file_name, "link": share_link})
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to generate share link for file: {path} by user {user_email}")
|
||||
|
||||
if share_links:
|
||||
logger.info(f"FileBrowserView: Generated {len(share_links)} share links for user {user_email}")
|
||||
return json_response({"share_links": share_links})
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to generate any share links for user {user_email}")
|
||||
return json_response({"error": "Failed to generate share links for any selected items"}, status=500)
|
||||
|
||||
logger.warning(f"FileBrowserView: Unknown file action for POST request: {route_name}")
|
||||
return web.Response(status=400, text="Unknown file action")
|
||||
return web.HTTPBadRequest(text="Unknown file action")
|
||||
|
||||
@login_required
|
||||
async def get_download_file(self):
|
||||
request = self # self is the request object here
|
||||
user_email = request["user"]["email"]
|
||||
file_service = request.app["file_service"]
|
||||
file_path = request.match_info.get("file_path")
|
||||
logger.debug(f"FileBrowserView: Download file request for path: {file_path} by user {user_email}")
|
||||
user_email = self.request["user"]["email"]
|
||||
file_service = self.request.app["file_service"]
|
||||
file_path = self.request.match_info.get("file_path")
|
||||
|
||||
if not file_path:
|
||||
logger.warning("FileBrowserView: Download file request missing file_path")
|
||||
raise web.HTTPBadRequest(text="File path is required for download")
|
||||
return web.HTTPBadRequest(text="File path is required for download")
|
||||
|
||||
result = await file_service.download_file(user_email, file_path)
|
||||
|
||||
if result:
|
||||
content, filename = result
|
||||
response = web.Response(body=content)
|
||||
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
|
||||
response.headers["Content-Type"] = "application/octet-stream"
|
||||
logger.info(f"FileBrowserView: File '{filename}' downloaded successfully by user {user_email}")
|
||||
return response
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to download file: {file_path} for user {user_email} - file not found or access denied")
|
||||
raise web.HTTPNotFound(text="File not found")
|
||||
|
||||
async def shared_file_handler(self):
|
||||
share_id = self.request.match_info.get("share_id")
|
||||
file_service = self.request.app["file_service"]
|
||||
logger.debug(f"FileBrowserView: Handling shared file request for share_id: {share_id}")
|
||||
|
||||
shared_item = await file_service.get_shared_item(share_id)
|
||||
|
||||
if not shared_item:
|
||||
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id}")
|
||||
return aiohttp_jinja2.render_template(
|
||||
"pages/errors/404.html",
|
||||
self.request,
|
||||
{"request": self.request, "message": "Shared link is invalid or has expired."},
|
||||
status=404
|
||||
)
|
||||
|
||||
user_email = shared_item["user_email"]
|
||||
item_path = shared_item["item_path"]
|
||||
full_path = file_service._get_user_file_path(user_email, item_path)
|
||||
|
||||
if full_path.is_file():
|
||||
result = await file_service.get_shared_file_content(share_id)
|
||||
if result:
|
||||
content, filename = result
|
||||
response = web.Response(body=content)
|
||||
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
|
||||
response.headers["Content-Type"] = "application/octet-stream"
|
||||
logger.info(f"FileBrowserView: Serving shared file '{filename}' for share_id: {share_id}")
|
||||
return response
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to get content for shared file: {item_path} (share_id: {share_id})")
|
||||
raise web.HTTPNotFound(text="Shared file not found or inaccessible")
|
||||
elif full_path.is_dir():
|
||||
files = await file_service.get_shared_folder_content(share_id)
|
||||
logger.info(f"FileBrowserView: Serving shared folder '{item_path}' for share_id: {share_id}")
|
||||
return aiohttp_jinja2.render_template(
|
||||
"pages/shared_folder.html",
|
||||
self.request,
|
||||
{
|
||||
"request": self.request,
|
||||
"files": files,
|
||||
"current_path": item_path,
|
||||
"share_id": share_id,
|
||||
"user_email": user_email,
|
||||
"active_page": "shared"
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Shared item is neither file nor directory: {item_path} (share_id: {share_id})")
|
||||
raise web.HTTPNotFound(text="Shared item not found")
|
||||
|
||||
async def download_shared_file_handler(self):
|
||||
share_id = self.request.match_info.get("share_id")
|
||||
file_path = self.request.query.get("file_path") # This is the path of the file *within* the shared item
|
||||
file_service = self.request.app["file_service"]
|
||||
logger.debug(f"FileBrowserView: Handling download shared file request for share_id: {share_id}, file_path: {file_path}")
|
||||
|
||||
if not file_path:
|
||||
logger.warning("FileBrowserView: Download shared file request missing file_path query parameter.")
|
||||
raise web.HTTPBadRequest(text="File path is required for download from shared folder.")
|
||||
|
||||
shared_item = await file_service.get_shared_item(share_id)
|
||||
if not shared_item:
|
||||
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id} during download.")
|
||||
return aiohttp_jinja2.render_template(
|
||||
"pages/errors/404.html",
|
||||
self.request,
|
||||
{"request": self.request, "message": "Shared link is invalid or has expired."},
|
||||
status=404
|
||||
)
|
||||
|
||||
# Ensure the shared item is a directory if a file_path is provided
|
||||
user_email = shared_item["user_email"]
|
||||
original_shared_item_path = file_service._get_user_file_path(user_email, shared_item["item_path"])
|
||||
|
||||
if not original_shared_item_path.is_dir():
|
||||
logger.warning(f"FileBrowserView: Attempt to download a specific file from a shared item that is not a directory. Share_id: {share_id}")
|
||||
raise web.HTTPBadRequest(text="Cannot download specific files from a shared item that is not a folder.")
|
||||
|
||||
result = await file_service.get_shared_file_content(share_id, file_path)
|
||||
if result:
|
||||
content, filename = result
|
||||
response = web.Response(body=content)
|
||||
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
|
||||
response.headers["Content-Type"] = "application/octet-stream"
|
||||
logger.info(f"FileBrowserView: Serving shared file '{filename}' from shared folder for share_id: {share_id}")
|
||||
return response
|
||||
else:
|
||||
logger.error(f"FileBrowserView: Failed to get content for shared file: {file_path} (share_id: {share_id}) from shared folder.")
|
||||
raise web.HTTPNotFound(text="Shared file not found or inaccessible within the shared folder.")
|
||||
return web.HTTPNotFound(text="File not found")
|
||||
|
||||
|
||||
class OrderView(CustomPydanticView):
|
||||
@ -460,7 +323,7 @@ class UserManagementView(web.View):
|
||||
elif route_name == "user_details":
|
||||
return await self.user_details_page()
|
||||
|
||||
raise web.HTTPNotFound()
|
||||
return web.HTTPNotFound()
|
||||
|
||||
@login_required
|
||||
async def post(self):
|
||||
@ -473,7 +336,7 @@ class UserManagementView(web.View):
|
||||
elif route_name == "delete_user_page":
|
||||
return await self.delete_user_submit()
|
||||
|
||||
raise web.HTTPNotFound()
|
||||
return web.HTTPNotFound()
|
||||
|
||||
async def add_user_page(self):
|
||||
return aiohttp_jinja2.render_template(
|
||||
@ -540,7 +403,7 @@ class UserManagementView(web.View):
|
||||
|
||||
user_service.update_user_quota(email, float(storage_quota_gb))
|
||||
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["users"].url_for().with_query(
|
||||
success=f"User {email} added successfully"
|
||||
)
|
||||
@ -566,7 +429,7 @@ class UserManagementView(web.View):
|
||||
user_data = user_service.get_user_by_email(email)
|
||||
|
||||
if not user_data:
|
||||
raise web.HTTPNotFound(text="User not found")
|
||||
return web.HTTPNotFound(text="User not found")
|
||||
|
||||
success_message = self.request.query.get("success")
|
||||
|
||||
@ -601,7 +464,7 @@ class UserManagementView(web.View):
|
||||
user_data = user_service.get_user_by_email(email)
|
||||
|
||||
if not user_data:
|
||||
raise web.HTTPNotFound(text="User not found")
|
||||
return web.HTTPNotFound(text="User not found")
|
||||
|
||||
if errors:
|
||||
return aiohttp_jinja2.render_template(
|
||||
@ -618,7 +481,7 @@ class UserManagementView(web.View):
|
||||
|
||||
user_service.update_user_quota(email, storage_quota_gb)
|
||||
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["edit_user"].url_for(email=email).with_query(
|
||||
success="User quota updated successfully"
|
||||
)
|
||||
@ -631,7 +494,7 @@ class UserManagementView(web.View):
|
||||
user_data = user_service.get_user_by_email(email)
|
||||
|
||||
if not user_data:
|
||||
raise web.HTTPNotFound(text="User not found")
|
||||
return web.HTTPNotFound(text="User not found")
|
||||
|
||||
return aiohttp_jinja2.render_template(
|
||||
"pages/user_details.html",
|
||||
@ -651,11 +514,11 @@ class UserManagementView(web.View):
|
||||
user_data = user_service.get_user_by_email(email)
|
||||
|
||||
if not user_data:
|
||||
raise web.HTTPNotFound(text="User not found")
|
||||
return web.HTTPNotFound(text="User not found")
|
||||
|
||||
user_service.delete_user(email)
|
||||
|
||||
raise web.HTTPFound(
|
||||
return web.HTTPFound(
|
||||
self.request.app.router["users"].url_for().with_query(
|
||||
success=f"User {email} deleted successfully"
|
||||
)
|
||||
|
||||
@ -1,50 +0,0 @@
|
||||
from aiohttp import web
|
||||
import aiohttp_jinja2
|
||||
from aiohttp.web_response import json_response
|
||||
|
||||
from ..helpers.auth import login_required
|
||||
|
||||
class UploadView(web.View):
|
||||
@login_required
|
||||
async def post(self):
|
||||
user_email = self.request["user"]["email"]
|
||||
file_service = self.request.app["file_service"]
|
||||
# Get current path from query parameter or form data
|
||||
current_path = self.request.query.get("current_path", "")
|
||||
|
||||
try:
|
||||
reader = await self.request.multipart()
|
||||
files_uploaded = []
|
||||
errors = []
|
||||
|
||||
while True:
|
||||
field = await reader.next()
|
||||
if field is None:
|
||||
break
|
||||
|
||||
# Check if the field is a file input
|
||||
if field.name == "file": # Assuming the input field name is 'file'
|
||||
filename = field.filename
|
||||
if not filename:
|
||||
errors.append("Filename is required for one of the files.")
|
||||
continue
|
||||
|
||||
content = await field.read()
|
||||
# Construct the full file path relative to the user's base directory
|
||||
full_file_path_for_service = f"{current_path}/{filename}" if current_path else filename
|
||||
|
||||
success = await file_service.upload_file(user_email, full_file_path_for_service, content)
|
||||
if success:
|
||||
files_uploaded.append(filename)
|
||||
else:
|
||||
errors.append(f"Failed to upload file '{filename}'")
|
||||
|
||||
if errors:
|
||||
return json_response({"status": "error", "message": "Some files failed to upload", "details": errors}, status=500)
|
||||
elif files_uploaded:
|
||||
return json_response({"status": "success", "message": f"Successfully uploaded {len(files_uploaded)} files", "files": files_uploaded})
|
||||
else:
|
||||
return json_response({"status": "error", "message": "No files were uploaded"}, status=400)
|
||||
|
||||
except Exception as e:
|
||||
return json_response({"status": "error", "message": f"Upload error: {str(e)}"}, status=500)
|
||||
@ -5,59 +5,6 @@ from retoors.main import create_app
|
||||
from retoors.services.config_service import ConfigService
|
||||
from pytest_mock import MockerFixture # Import MockerFixture
|
||||
import datetime # Import datetime
|
||||
from aiohttp.test_utils import TestClient # Import TestClient
|
||||
from aiohttp_session import setup as setup_session # Import setup_session
|
||||
from aiohttp_session.cookie_storage import EncryptedCookieStorage # Import EncryptedCookieStorage
|
||||
from retoors.helpers.env_manager import get_or_create_session_secret_key # Import get_or_create_session_secret_key
|
||||
from retoors.middlewares import user_middleware, error_middleware # Import middlewares
|
||||
from retoors.services.user_service import UserService # Import UserService
|
||||
from retoors.routes import setup_routes # Import setup_routes
|
||||
import aiohttp_jinja2 # Import aiohttp_jinja2
|
||||
import jinja2 # Import jinja2
|
||||
import aiohttp # Import aiohttp
|
||||
from retoors.services.file_service import FileService # Import FileService
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_user_files_dir(tmp_path):
|
||||
"""Fixture to create a temporary directory for user files."""
|
||||
user_files_dir = tmp_path / "user_files"
|
||||
user_files_dir.mkdir()
|
||||
return user_files_dir
|
||||
|
||||
@pytest.fixture
|
||||
def temp_users_json(tmp_path):
|
||||
"""Fixture to create a temporary users.json file."""
|
||||
users_json_path = tmp_path / "users.json"
|
||||
initial_users_data = [
|
||||
{
|
||||
"email": "test@example.com",
|
||||
"full_name": "Test User",
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": None,
|
||||
"shared_items": {}
|
||||
},
|
||||
{
|
||||
"email": "child@example.com",
|
||||
"full_name": "Child User",
|
||||
"email": "child@example.com",
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 5,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": "test@example.com",
|
||||
"shared_items": {}
|
||||
}
|
||||
]
|
||||
with open(users_json_path, "w") as f:
|
||||
json.dump(initial_users_data, f)
|
||||
return users_json_path
|
||||
|
||||
@pytest.fixture
|
||||
def file_service_instance(temp_user_files_dir, temp_users_json):
|
||||
"""Fixture to provide a FileService instance with temporary directories."""
|
||||
return FileService(temp_user_files_dir, temp_users_json)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -65,42 +12,6 @@ def create_app_instance():
|
||||
"""Fixture to create a new aiohttp application instance."""
|
||||
return create_app()
|
||||
|
||||
@pytest.fixture
|
||||
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
|
||||
"""Fixture to create a test aiohttp application with mocked services."""
|
||||
from aiohttp import web
|
||||
|
||||
app = web.Application()
|
||||
|
||||
# Setup session for the test app
|
||||
project_root = Path(__file__).parent.parent
|
||||
env_file_path = project_root / ".env"
|
||||
secret_key = get_or_create_session_secret_key(env_file_path)
|
||||
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
|
||||
|
||||
app.middlewares.append(error_middleware)
|
||||
app.middlewares.append(user_middleware)
|
||||
|
||||
# Mock UserService
|
||||
mock_user_service = mocker.MagicMock(spec=UserService)
|
||||
|
||||
# Mock scheduler
|
||||
mock_scheduler = mocker.MagicMock()
|
||||
mock_scheduler.spawn = mocker.AsyncMock()
|
||||
mock_scheduler.close = mocker.AsyncMock()
|
||||
|
||||
app["user_service"] = mock_user_service
|
||||
app["file_service"] = file_service_instance
|
||||
app["scheduler"] = mock_scheduler
|
||||
|
||||
# Setup Jinja2 for templates
|
||||
base_path = Path(__file__).parent.parent / "retoors"
|
||||
templates_path = base_path / "templates"
|
||||
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
|
||||
|
||||
setup_routes(app)
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_users_db_fixture():
|
||||
@ -128,8 +39,9 @@ def mock_users_db_fixture():
|
||||
"storage_quota_gb": 50,
|
||||
"storage_used_gb": 5,
|
||||
"parent_email": "admin@example.com",
|
||||
"shared_items": {}
|
||||
}
|
||||
"reset_token": None,
|
||||
"reset_token_expiry": None,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@ -330,118 +242,6 @@ async def client(
|
||||
config_file.unlink(missing_ok=True) # Use missing_ok for robustness
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def logged_in_client(aiohttp_client, create_test_app, mocker):
|
||||
"""Fixture to provide an aiohttp client with a logged-in user."""
|
||||
app = create_test_app
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
user_service = app["user_service"]
|
||||
|
||||
def mock_create_user(full_name, email, password, parent_email=None):
|
||||
return {
|
||||
"full_name": full_name,
|
||||
"email": email,
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": parent_email,
|
||||
"shared_items": {}
|
||||
}
|
||||
|
||||
def mock_authenticate_user(email, password):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Test User",
|
||||
"is_admin": False,
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
def mock_get_user_by_email(email):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Test User",
|
||||
"is_admin": False,
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||
|
||||
await client.post(
|
||||
"/register",
|
||||
data={
|
||||
"full_name": "Test User",
|
||||
"email": "test@example.com",
|
||||
"password": "password",
|
||||
"confirm_password": "password",
|
||||
},
|
||||
)
|
||||
await client.post(
|
||||
"/login", data={"email": "test@example.com", "password": "password"}
|
||||
)
|
||||
|
||||
return client
|
||||
|
||||
@pytest.fixture
|
||||
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
|
||||
"""Fixture to provide an aiohttp client with a logged-in admin user."""
|
||||
app = create_test_app
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
user_service = app["user_service"]
|
||||
|
||||
def mock_create_user(full_name, email, password, parent_email=None):
|
||||
return {
|
||||
"full_name": full_name,
|
||||
"email": email,
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": parent_email,
|
||||
"shared_items": {}
|
||||
}
|
||||
|
||||
def mock_authenticate_user(email, password):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Admin User",
|
||||
"is_admin": True,
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
def mock_get_user_by_email(email):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Admin User",
|
||||
"is_admin": True,
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||
|
||||
await client.post(
|
||||
"/register",
|
||||
data={
|
||||
"full_name": "Admin User",
|
||||
"email": "admin@example.com",
|
||||
"password": "password",
|
||||
"confirm_password": "password",
|
||||
},
|
||||
)
|
||||
await client.post(
|
||||
"/login", data={"email": "admin@example.com", "password": "password"}
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_send_email(mocker: MockerFixture):
|
||||
"""
|
||||
|
||||
@ -4,7 +4,6 @@ from pathlib import Path
|
||||
import json
|
||||
import datetime
|
||||
import aiohttp
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestClient
|
||||
from aiohttp_session import setup as setup_session
|
||||
from aiohttp_session.cookie_storage import EncryptedCookieStorage
|
||||
@ -13,9 +12,157 @@ from retoors.helpers.env_manager import get_or_create_session_secret_key
|
||||
# Assuming the FileService is in retoors/services/file_service.py
|
||||
# and the FileBrowserView is in retoors/views/site.py
|
||||
|
||||
@pytest.fixture
|
||||
def temp_user_files_dir(tmp_path):
|
||||
"""Fixture to create a temporary directory for user files."""
|
||||
user_files_dir = tmp_path / "user_files"
|
||||
user_files_dir.mkdir()
|
||||
return user_files_dir
|
||||
|
||||
@pytest.fixture
|
||||
def temp_users_json(tmp_path):
|
||||
"""Fixture to create a temporary users.json file."""
|
||||
users_json_path = tmp_path / "users.json"
|
||||
initial_users_data = {
|
||||
"test@example.com": {
|
||||
"full_name": "Test User",
|
||||
"email": "test@example.com",
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": None,
|
||||
"shared_items": {}
|
||||
},
|
||||
"child@example.com": {
|
||||
"full_name": "Child User",
|
||||
"email": "child@example.com",
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 5,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": "test@example.com",
|
||||
"shared_items": {}
|
||||
}
|
||||
}
|
||||
with open(users_json_path, "w") as f:
|
||||
json.dump(initial_users_data, f)
|
||||
return users_json_path
|
||||
|
||||
@pytest.fixture
|
||||
def file_service_instance(temp_user_files_dir, temp_users_json):
|
||||
"""Fixture to provide a FileService instance with temporary directories."""
|
||||
from retoors.services.file_service import FileService
|
||||
return FileService(temp_user_files_dir, temp_users_json)
|
||||
|
||||
@pytest.fixture
|
||||
async def logged_in_client(aiohttp_client, create_test_app, mocker):
|
||||
"""Fixture to provide an aiohttp client with a logged-in user."""
|
||||
app = create_test_app
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
user_service = app["user_service"]
|
||||
|
||||
def mock_create_user(full_name, email, password, parent_email=None):
|
||||
return {
|
||||
"full_name": full_name,
|
||||
"email": email,
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": parent_email,
|
||||
"shared_items": {}
|
||||
}
|
||||
|
||||
def mock_authenticate_user(email, password):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Test User",
|
||||
"is_admin": False,
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
def mock_get_user_by_email(email):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Test User",
|
||||
"is_admin": False,
|
||||
"storage_quota_gb": 10,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||
|
||||
await client.post(
|
||||
"/register",
|
||||
data={
|
||||
"full_name": "Test User",
|
||||
"email": "test@example.com",
|
||||
"password": "password",
|
||||
"confirm_password": "password",
|
||||
},
|
||||
)
|
||||
await client.post(
|
||||
"/login", data={"email": "test@example.com", "password": "password"}
|
||||
)
|
||||
|
||||
return client
|
||||
|
||||
@pytest.fixture
|
||||
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
|
||||
"""Fixture to provide an aiohttp client with a logged-in admin user."""
|
||||
app = create_test_app
|
||||
client = await aiohttp_client(app)
|
||||
|
||||
user_service = app["user_service"]
|
||||
|
||||
def mock_create_user(full_name, email, password, parent_email=None):
|
||||
return {
|
||||
"full_name": full_name,
|
||||
"email": email,
|
||||
"password": "hashed_password",
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0,
|
||||
"parent_email": parent_email,
|
||||
"shared_items": {}
|
||||
}
|
||||
|
||||
def mock_authenticate_user(email, password):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Admin User",
|
||||
"is_admin": True,
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
def mock_get_user_by_email(email):
|
||||
return {
|
||||
"email": email,
|
||||
"full_name": "Admin User",
|
||||
"is_admin": True,
|
||||
"storage_quota_gb": 100,
|
||||
"storage_used_gb": 0
|
||||
}
|
||||
|
||||
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
|
||||
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
|
||||
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
|
||||
|
||||
await client.post(
|
||||
"/register",
|
||||
data={
|
||||
"full_name": "Admin User",
|
||||
"email": "admin@example.com",
|
||||
"password": "password",
|
||||
"confirm_password": "password",
|
||||
},
|
||||
)
|
||||
await client.post(
|
||||
"/login", data={"email": "admin@example.com", "password": "password"}
|
||||
)
|
||||
return client
|
||||
|
||||
|
||||
# --- FileService Tests ---
|
||||
@ -163,10 +310,10 @@ async def test_file_service_shared_link_expiry(file_service_instance, mocker):
|
||||
await file_service_instance.upload_file(user_email, file_path, b"expiring content")
|
||||
share_id = await file_service_instance.generate_share_link(user_email, file_path)
|
||||
|
||||
# Mock datetime to simulate an expired link (after generating the link)
|
||||
# Mock datetime to simulate an expired link
|
||||
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
|
||||
mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
|
||||
mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
|
||||
mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime))
|
||||
datetime.datetime.now.return_value = future_time
|
||||
|
||||
shared_item = await file_service_instance.get_shared_item(share_id)
|
||||
assert shared_item is None
|
||||
@ -208,25 +355,6 @@ async def test_file_browser_new_folder(logged_in_client: TestClient, file_servic
|
||||
expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
|
||||
assert expected_path.is_dir()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient):
|
||||
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert "error=Folder+name+is+required" in resp.headers["Location"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
folder_name = "existing_folder_web"
|
||||
await file_service_instance.create_folder(user_email, folder_name) # Create it first
|
||||
|
||||
# Mock create_folder to return False, simulating it already exists or failed
|
||||
mocker.patch.object(file_service_instance, "create_folder", return_value=False)
|
||||
|
||||
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||
user_email = "test@example.com"
|
||||
@ -240,7 +368,8 @@ async def test_file_browser_upload_file(logged_in_client: TestClient, file_servi
|
||||
content_type='text/plain')
|
||||
|
||||
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
|
||||
assert resp.status == 200
|
||||
assert resp.status == 302 # Redirect
|
||||
assert resp.headers["Location"].startswith("/files")
|
||||
|
||||
expected_path = temp_user_files_dir / user_email / "uploaded.txt"
|
||||
assert expected_path.is_file()
|
||||
@ -260,9 +389,8 @@ async def test_file_browser_download_file(logged_in_client: TestClient, file_ser
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
|
||||
response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
|
||||
assert response.status == 404
|
||||
assert "File not found" in await response.text()
|
||||
resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
|
||||
assert resp.status == 404
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||
@ -294,272 +422,61 @@ async def test_file_browser_delete_folder(logged_in_client: TestClient, file_ser
|
||||
assert not expected_path.is_dir()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||
async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance):
|
||||
user_email = "test@example.com"
|
||||
file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"]
|
||||
for name in file_names:
|
||||
await file_service_instance.upload_file(user_email, name, b"content")
|
||||
|
||||
paths_to_delete = [f"{name}" for name in file_names]
|
||||
|
||||
# Construct FormData for multiple paths
|
||||
data = aiohttp.FormData()
|
||||
for path in paths_to_delete:
|
||||
data.add_field('paths[]', path)
|
||||
|
||||
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
||||
assert resp.status == 302 # Redirect
|
||||
assert resp.headers["Location"].startswith("/files")
|
||||
|
||||
for name in file_names:
|
||||
expected_path = temp_user_files_dir / user_email / name
|
||||
assert not expected_path.is_file()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
|
||||
user_email = "test@example.com"
|
||||
folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"]
|
||||
for name in folder_names:
|
||||
await file_service_instance.create_folder(user_email, name)
|
||||
await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content")
|
||||
|
||||
paths_to_delete = [f"{name}" for name in folder_names]
|
||||
|
||||
# Construct FormData for multiple paths
|
||||
data = aiohttp.FormData()
|
||||
for path in paths_to_delete:
|
||||
data.add_field('paths[]', path)
|
||||
|
||||
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
||||
assert resp.status == 302 # Redirect
|
||||
assert resp.headers["Location"].startswith("/files")
|
||||
|
||||
for name in folder_names:
|
||||
expected_path = temp_user_files_dir / user_email / name
|
||||
assert not expected_path.is_dir()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient):
|
||||
resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert "error=No+items+selected+for+deletion" in resp.headers["Location"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, temp_user_files_dir, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_names = ["fail_delete_1.txt", "fail_delete_2.txt"]
|
||||
for name in file_names:
|
||||
await file_service_instance.upload_file(user_email, name, b"content")
|
||||
|
||||
paths_to_delete = [f"{name}" for name in file_names]
|
||||
|
||||
# Mock delete_item to fail for the first item
|
||||
original_delete_item = file_service_instance.delete_item
|
||||
async def mock_delete_item(email, path):
|
||||
if path == file_names[0]:
|
||||
return False # Simulate failure for the first item
|
||||
return await original_delete_item(email, path)
|
||||
|
||||
mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item)
|
||||
|
||||
data = aiohttp.FormData()
|
||||
for path in paths_to_delete:
|
||||
data.add_field('paths[]', path)
|
||||
|
||||
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
|
||||
assert resp.status == 302 # Redirect
|
||||
assert "error=Some+items+failed+to+delete" in resp.headers["Location"]
|
||||
|
||||
# Check if the first file still exists (failed to delete)
|
||||
assert (temp_user_files_dir / user_email / file_names[0]).is_file()
|
||||
# Check if the second file is deleted (succeeded)
|
||||
assert not (temp_user_files_dir / user_email / file_names[1]).is_file()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient):
|
||||
resp = await logged_in_client.post("/files/share_multiple", json={"paths": []})
|
||||
assert resp.status == 400
|
||||
data = await resp.json()
|
||||
assert data["error"] == "No items selected for sharing"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_share_file_missing_path(logged_in_client: TestClient):
|
||||
resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL
|
||||
assert resp.status == 400
|
||||
data = await resp.json()
|
||||
assert data["error"] == "File path is required for sharing"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_name = "fail_share_link.txt"
|
||||
await file_service_instance.upload_file(user_email, file_name, b"content")
|
||||
|
||||
mocker.patch.object(file_service_instance, "generate_share_link", return_value=None)
|
||||
file_name = "web_share.txt"
|
||||
await file_service_instance.upload_file(user_email, file_name, b"shareable content")
|
||||
|
||||
resp = await logged_in_client.post(f"/files/share/{file_name}")
|
||||
assert resp.status == 500
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert data["error"] == "Failed to generate share link"
|
||||
assert "share_link" in data
|
||||
assert "http" in data["share_link"] # Check if it's a URL
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient):
|
||||
resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL
|
||||
assert resp.status == 302
|
||||
assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"]
|
||||
# Verify the shared item can be retrieved
|
||||
# The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID
|
||||
share_id = data["share_link"].split("/")[-1]
|
||||
shared_item = await file_service_instance.get_shared_item(share_id)
|
||||
assert shared_item is not None
|
||||
assert shared_item["item_path"] == file_name
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_name = "fail_delete.txt"
|
||||
await file_service_instance.upload_file(user_email, file_name, b"content")
|
||||
@pytest.fixture
|
||||
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
|
||||
"""Fixture to create a test aiohttp application with mocked services."""
|
||||
from aiohttp import web
|
||||
from retoors.middlewares import user_middleware, error_middleware
|
||||
from retoors.services.user_service import UserService
|
||||
from retoors.routes import setup_routes
|
||||
import aiohttp_jinja2
|
||||
import jinja2
|
||||
|
||||
mocker.patch.object(file_service_instance, "delete_item", return_value=False)
|
||||
app = web.Application()
|
||||
|
||||
resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"]
|
||||
# Setup session for the test app
|
||||
project_root = Path(__file__).parent.parent
|
||||
env_file_path = project_root / ".env"
|
||||
secret_key = get_or_create_session_secret_key(env_file_path)
|
||||
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
folder_name = "shared_folder"
|
||||
file_name = "nested.txt"
|
||||
share_id = "test_share_id"
|
||||
app.middlewares.append(error_middleware)
|
||||
app.middlewares.append(user_middleware)
|
||||
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
||||
"user_email": user_email,
|
||||
"item_path": folder_name,
|
||||
"share_id": share_id,
|
||||
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||||
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
||||
})
|
||||
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: True))
|
||||
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
|
||||
# Mock UserService
|
||||
mock_user_service = mocker.MagicMock(spec=UserService)
|
||||
|
||||
resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}")
|
||||
assert resp.status == 404
|
||||
text = await resp.text()
|
||||
assert "Shared file not found or inaccessible within the shared folder." in text
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_name = "shared_file.txt"
|
||||
share_id = "test_share_id"
|
||||
# Mock scheduler
|
||||
mock_scheduler = mocker.MagicMock()
|
||||
mock_scheduler.spawn = mocker.AsyncMock()
|
||||
mock_scheduler.close = mocker.AsyncMock()
|
||||
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
||||
"user_email": user_email,
|
||||
"item_path": file_name,
|
||||
"share_id": share_id,
|
||||
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||||
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
||||
})
|
||||
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: False))
|
||||
|
||||
resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt")
|
||||
assert resp.status == 400
|
||||
text = await resp.text()
|
||||
assert "Cannot download specific files from a shared item that is not a folder." in text
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker):
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
|
||||
resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt")
|
||||
assert resp.status == 404
|
||||
text = await resp.text()
|
||||
assert "Shared link is invalid or has expired." in text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient):
|
||||
resp = await client.get("/shared_file/some_share_id/download")
|
||||
assert resp.status == 400
|
||||
assert "File path is required for download from shared folder." in await resp.text()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_name = "shared_file.txt"
|
||||
share_id = "test_share_id"
|
||||
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
||||
"user_email": user_email,
|
||||
"item_path": file_name,
|
||||
"share_id": share_id,
|
||||
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||||
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
||||
})
|
||||
mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file
|
||||
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
|
||||
|
||||
resp = await client.get(f"/shared_file/{share_id}")
|
||||
assert resp.status == 404
|
||||
text = await resp.text()
|
||||
assert "Shared file not found or inaccessible" in text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
item_path = "mystery_item"
|
||||
share_id = "test_share_id"
|
||||
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
|
||||
"user_email": user_email,
|
||||
"item_path": item_path,
|
||||
"share_id": share_id,
|
||||
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||||
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
|
||||
})
|
||||
|
||||
# Mock Path.is_file and Path.is_dir to return False
|
||||
mocker.patch("pathlib.Path.is_file", return_value=False)
|
||||
mocker.patch("pathlib.Path.is_dir", return_value=False)
|
||||
|
||||
resp = await client.get(f"/shared_file/{share_id}")
|
||||
assert resp.status == 404
|
||||
text = await resp.text()
|
||||
assert "Shared item not found" in text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker):
|
||||
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
|
||||
resp = await client.get("/shared_file/nonexistent_share_id")
|
||||
assert resp.status == 404
|
||||
text = await resp.text()
|
||||
assert "Shared link is invalid or has expired." in text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker):
|
||||
# Mock the route name to simulate an unknown action
|
||||
mock_route = mocker.MagicMock(name="unknown_action")
|
||||
mock_route.current_app = logged_in_client.app # Provide a mock current_app
|
||||
mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route})
|
||||
|
||||
resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False)
|
||||
assert resp.status == 400
|
||||
assert "Unknown file action" in await resp.text()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
|
||||
user_email = "test@example.com"
|
||||
file_names = ["fail_share_1.txt", "fail_share_2.txt"]
|
||||
for name in file_names:
|
||||
await file_service_instance.upload_file(user_email, name, b"content to share")
|
||||
|
||||
paths_to_share = [f"{name}" for name in file_names]
|
||||
|
||||
# Mock generate_share_link to fail for the first item
|
||||
original_generate_share_link = file_service_instance.generate_share_link
|
||||
async def mock_generate_share_link(email, path):
|
||||
if path == file_names[0]:
|
||||
return None # Simulate failure for the first item
|
||||
return await original_generate_share_link(email, path)
|
||||
|
||||
mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link)
|
||||
|
||||
resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share})
|
||||
assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds
|
||||
data = await resp.json()
|
||||
assert "share_links" in data
|
||||
assert len(data["share_links"]) == 1 # Only one link should be generated
|
||||
assert data["share_links"][0]["name"] == file_names[1] # The successful one
|
||||
app["user_service"] = mock_user_service
|
||||
app["file_service"] = file_service_instance
|
||||
app["scheduler"] = mock_scheduler
|
||||
|
||||
# Setup Jinja2 for templates
|
||||
base_path = Path(__file__).parent.parent / "retoors"
|
||||
templates_path = base_path / "templates"
|
||||
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
|
||||
|
||||
setup_routes(app)
|
||||
return app
|
||||
|
||||
@ -133,44 +133,34 @@ async def test_privacy_get(client):
|
||||
assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text
|
||||
|
||||
|
||||
async def test_shared_get_authorized(logged_in_client):
|
||||
resp = await logged_in_client.get("/shared")
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert "Shared with me" in text
|
||||
assert "Files and folders that have been shared with you will appear here." in text
|
||||
async def test_shared_get_unauthorized(client):
|
||||
resp = await client.get("/shared", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert resp.headers["Location"] == "/login"
|
||||
|
||||
|
||||
async def test_recent_get_authorized(logged_in_client):
|
||||
resp = await logged_in_client.get("/recent")
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert "Recent Files" in text
|
||||
assert "Your recently accessed files will appear here." in text
|
||||
async def test_recent_get_unauthorized(client):
|
||||
resp = await client.get("/recent", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert resp.headers["Location"] == "/login"
|
||||
|
||||
|
||||
async def test_favorites_get_authorized(logged_in_client):
|
||||
resp = await logged_in_client.get("/favorites")
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert "Favorites" in text
|
||||
assert "Your favorite files and folders will appear here." in text
|
||||
async def test_favorites_get_unauthorized(client):
|
||||
resp = await client.get("/favorites", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert resp.headers["Location"] == "/login"
|
||||
|
||||
|
||||
async def test_trash_get_authorized(logged_in_client):
|
||||
resp = await logged_in_client.get("/trash")
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert "Trash" in text
|
||||
assert "Files and folders you have deleted will appear here." in text
|
||||
async def test_trash_get_unauthorized(client):
|
||||
resp = await client.get("/trash", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert resp.headers["Location"] == "/login"
|
||||
|
||||
|
||||
async def test_users_get_authorized(logged_in_client):
|
||||
resp = await logged_in_client.get("/users")
|
||||
assert resp.status == 200
|
||||
text = await resp.text()
|
||||
assert "User Management" in text
|
||||
assert "+ Add New User" in text
|
||||
async def test_file_browser_get_unauthorized(client):
|
||||
resp = await client.get("/files", allow_redirects=False)
|
||||
assert resp.status == 302
|
||||
assert resp.headers["Location"] == "/login"
|
||||
|
||||
|
||||
async def test_file_browser_get_authorized(client):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user