Compare commits

...

10 Commits

Author SHA1 Message Date
f54941dd80 Broken. 2025-11-09 02:34:06 +01:00
059457deae Update. 2025-11-09 02:14:26 +01:00
5d3d0b162d Update. 2025-11-09 01:54:31 +01:00
ea8af383cc Update. 2025-11-09 01:49:11 +01:00
6e47d43a03 Update. 2025-11-09 01:35:03 +01:00
88d57c3837 Update. 2025-11-09 01:34:53 +01:00
9e9907bc00 Update. 2025-11-09 00:52:26 +01:00
c6fb77c89d Update. 2025-11-09 00:52:16 +01:00
81f1cfd200 Update. 2025-11-09 00:33:57 +01:00
e228a2e59c Update. 2025-11-09 00:27:13 +01:00
18 changed files with 1439 additions and 474 deletions

View File

@ -9,3 +9,4 @@ pytest
pytest-aiohttp pytest-aiohttp
aiohttp-test-utils aiohttp-test-utils
pytest-mock pytest-mock
pillow

View File

@ -6,13 +6,17 @@ from ..services.user_service import UserService # Import UserService
def login_required(func): def login_required(func):
@wraps(func) @wraps(func)
async def wrapper(self, *args, **kwargs): async def wrapper(self, *args, **kwargs):
session = await get_session(self.request) if not getattr(self, 'request', None):
request = self
else:
request = self.request
session = await get_session(request)
user_email = session.get('user_email') user_email = session.get('user_email')
if not user_email: if not user_email:
raise web.HTTPFound('/login') raise web.HTTPFound('/login')
user_service: UserService = self.request.app["user_service"] user_service: UserService = request.app["user_service"]
user = user_service.get_user_by_email(user_email) user = user_service.get_user_by_email(user_email)
if not user: if not user:
@ -21,6 +25,6 @@ def login_required(func):
raise web.HTTPFound('/login') raise web.HTTPFound('/login')
# Ensure the user object is available in the request for views # Ensure the user object is available in the request for views
self.request["user"] = user request["user"] = user
return await func(self, *args, **kwargs) return await func(self, *args, **kwargs)
return wrapper return wrapper

View File

@ -11,7 +11,7 @@ from .routes import setup_routes
from .services.user_service import UserService from .services.user_service import UserService
from .services.config_service import ConfigService from .services.config_service import ConfigService
from .services.file_service import FileService # Import FileService from .services.file_service import FileService # Import FileService
from .middlewares import user_middleware, error_middleware from .middlewares import user_middleware, error_middleware,request_hybrid_middleware
from .helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key # Import new function from .helpers.env_manager import ensure_env_file_exists, get_or_create_session_secret_key # Import new function
@ -40,6 +40,7 @@ def create_app():
# The order of middleware registration matters. # The order of middleware registration matters.
# They are executed in the order they are added. # They are executed in the order they are added.
app.middlewares.append(request_hybrid_middleware)
app.middlewares.append(error_middleware) app.middlewares.append(error_middleware)
# Setup session # Setup session

View File

@ -1,6 +1,10 @@
from aiohttp import web from aiohttp import web
from aiohttp_session import get_session from aiohttp_session import get_session
@web.middleware
async def request_hybrid_middleware(request, handler):
setattr(request,'request', request)
return await handler(request)
@web.middleware @web.middleware
async def user_middleware(request, handler): async def user_middleware(request, handler):
@ -20,3 +24,4 @@ async def error_middleware(request, handler):
raise # Re-raise HTTPException to see original traceback raise # Re-raise HTTPException to see original traceback
except Exception: except Exception:
raise # Re-raise generic Exception to see original traceback raise # Re-raise generic Exception to see original traceback

View File

@ -1,5 +1,6 @@
from .views.auth import LoginView, RegistrationView, LogoutView, ForgotPasswordView, ResetPasswordView from .views.auth import LoginView, RegistrationView, LogoutView, ForgotPasswordView, ResetPasswordView
from .views.site import SiteView, OrderView, FileBrowserView, UserManagementView from .views.site import SiteView, OrderView, FileBrowserView, UserManagementView
from .views.upload import UploadView
from .views.admin import get_users, add_user, update_user_quota, delete_user, get_user_details, delete_team from .views.admin import get_users, add_user, update_user_quota, delete_user, get_user_details, delete_team
@ -30,10 +31,14 @@ def setup_routes(app):
app.router.add_post("/users/{email}/delete", UserManagementView, name="delete_user_page") app.router.add_post("/users/{email}/delete", UserManagementView, name="delete_user_page")
app.router.add_view("/files", FileBrowserView, name="file_browser") app.router.add_view("/files", FileBrowserView, name="file_browser")
app.router.add_post("/files/new_folder", FileBrowserView, name="new_folder") app.router.add_post("/files/new_folder", FileBrowserView, name="new_folder")
app.router.add_post("/files/upload", FileBrowserView, name="upload_file") app.router.add_post("/files/upload", UploadView, name="upload_file")
app.router.add_get("/files/download/{file_path:.*}", FileBrowserView, name="download_file") app.router.add_get("/files/download/{file_path:.*}", FileBrowserView.get_download_file, name="download_file")
app.router.add_post("/files/share/{file_path:.*}", FileBrowserView, name="share_file") app.router.add_post("/files/share/{file_path:.*}", FileBrowserView, name="share_file")
app.router.add_post("/files/delete/{file_path:.*}", FileBrowserView, name="delete_item") app.router.add_post("/files/delete/{file_path:.*}", FileBrowserView, name="delete_item")
app.router.add_post("/files/delete_multiple", FileBrowserView, name="delete_multiple_items")
app.router.add_post("/files/share_multiple", FileBrowserView, name="share_multiple_items")
app.router.add_get("/shared_file/{share_id}", FileBrowserView.shared_file_handler, name="shared_file")
app.router.add_get("/shared_file/{share_id}/download", FileBrowserView.download_shared_file_handler, name="download_shared_file")
# Admin API routes for user and team management # Admin API routes for user and team management
app.router.add_get("/api/users", get_users, name="api_get_users") app.router.add_get("/api/users", get_users, name="api_get_users")

View File

@ -12,8 +12,11 @@ class ConfigService:
def _load_config(self): def _load_config(self):
config_from_file = {} config_from_file = {}
if self._config_path.exists(): if self._config_path.exists():
try:
with open(self._config_path, "r") as f: with open(self._config_path, "r") as f:
config_from_file = json.load(f) config_from_file = json.load(f)
except json.JSONDecodeError:
config_from_file = {}
# Override with environment variables # Override with environment variables
config_from_env = { config_from_env = {

View File

@ -4,35 +4,53 @@ from pathlib import Path
import shutil import shutil
import uuid import uuid
import datetime import datetime
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class FileService: class FileService:
def __init__(self, base_dir: Path, users_data_path: Path): def __init__(self, base_dir: Path, users_data_path: Path):
self.base_dir = base_dir self.base_dir = base_dir
self.users_data_path = users_data_path self.users_data_path = users_data_path
self.base_dir.mkdir(parents=True, exist_ok=True) self.base_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"FileService initialized with base_dir: {self.base_dir} and users_data_path: {self.users_data_path}")
async def _load_users_data(self): async def _load_users_data(self):
"""Loads user data from the JSON file.""" """Loads user data from the JSON file."""
if not self.users_data_path.exists(): if not self.users_data_path.exists():
return {} logger.warning(f"users_data_path does not exist: {self.users_data_path}")
return []
async with aiofiles.open(self.users_data_path, mode="r") as f: async with aiofiles.open(self.users_data_path, mode="r") as f:
content = await f.read() content = await f.read()
return json.loads(content) if content else {} try:
return json.loads(content) if content else []
except json.JSONDecodeError:
logger.error(f"JSONDecodeError when loading users data from {self.users_data_path}")
return []
async def _save_users_data(self, data): async def _save_users_data(self, data):
"""Saves user data to the JSON file.""" """Saves user data to the JSON file."""
async with aiofiles.open(self.users_data_path, mode="w") as f: async with aiofiles.open(self.users_data_path, mode="w") as f:
await f.write(json.dumps(data, indent=4)) await f.write(json.dumps(data, indent=4))
logger.debug(f"Saved users data to {self.users_data_path}")
def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path: def _get_user_file_path(self, user_email: str, relative_path: str = "") -> Path:
"""Constructs the absolute path for a user's file or directory.""" """Constructs the absolute path for a user's file or directory."""
user_dir = self.base_dir / user_email user_dir = self.base_dir / user_email
return user_dir / relative_path full_path = user_dir / relative_path
logger.debug(f"Constructed path for user '{user_email}', relative_path '{relative_path}': {full_path}")
return full_path
async def list_files(self, user_email: str, path: str = "") -> list: async def list_files(self, user_email: str, path: str = "") -> list:
"""Lists files and directories for a given user within a specified path.""" """Lists files and directories for a given user within a specified path."""
user_path = self._get_user_file_path(user_email, path) user_path = self._get_user_file_path(user_email, path)
if not user_path.is_dir(): if not user_path.is_dir():
logger.warning(f"list_files: User path is not a directory or does not exist: {user_path}")
return [] return []
files_list = [] files_list = []
@ -47,14 +65,17 @@ class FileService:
"last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(), "last_modified": datetime.datetime.fromtimestamp(item.stat().st_mtime).isoformat(),
} }
files_list.append(file_info) files_list.append(file_info)
logger.debug(f"Listed {len(files_list)} items for user '{user_email}' in path '{path}'")
return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower())) return sorted(files_list, key=lambda x: (not x["is_dir"], x["name"].lower()))
async def create_folder(self, user_email: str, folder_path: str) -> bool: async def create_folder(self, user_email: str, folder_path: str) -> bool:
"""Creates a new folder for the user.""" """Creates a new folder for the user."""
full_path = self._get_user_file_path(user_email, folder_path) full_path = self._get_user_file_path(user_email, folder_path)
if full_path.exists(): if full_path.exists():
logger.warning(f"create_folder: Folder already exists: {full_path}")
return False # Folder already exists return False # Folder already exists
full_path.mkdir(parents=True, exist_ok=True) full_path.mkdir(parents=True, exist_ok=True)
logger.info(f"create_folder: Folder created: {full_path}")
return True return True
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool: async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
@ -63,38 +84,49 @@ class FileService:
full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist full_path.parent.mkdir(parents=True, exist_ok=True) # Ensure parent directories exist
async with aiofiles.open(full_path, mode="wb") as f: async with aiofiles.open(full_path, mode="wb") as f:
await f.write(content) await f.write(content)
logger.info(f"upload_file: File uploaded to: {full_path}")
return True return True
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None: async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
"""Downloads a file for the user.""" """Downloads a file for the user."""
full_path = self._get_user_file_path(user_email, file_path) full_path = self._get_user_file_path(user_email, file_path)
logger.debug(f"download_file: Attempting to download file from: {full_path}")
if full_path.is_file(): if full_path.is_file():
async with aiofiles.open(full_path, mode="rb") as f: async with aiofiles.open(full_path, mode="rb") as f:
content = await f.read() content = await f.read()
logger.info(f"download_file: Successfully read file: {full_path}")
return content, full_path.name return content, full_path.name
logger.warning(f"download_file: File not found or is not a file: {full_path}")
return None return None
async def delete_item(self, user_email: str, item_path: str) -> bool: async def delete_item(self, user_email: str, item_path: str) -> bool:
"""Deletes a file or folder for the user.""" """Deletes a file or folder for the user."""
full_path = self._get_user_file_path(user_email, item_path) full_path = self._get_user_file_path(user_email, item_path)
logger.debug(f"delete_item: Attempting to delete item: {full_path}")
if not full_path.exists(): if not full_path.exists():
logger.warning(f"delete_item: Item does not exist: {full_path}")
return False return False
if full_path.is_file(): if full_path.is_file():
full_path.unlink() full_path.unlink()
logger.info(f"delete_item: File deleted: {full_path}")
elif full_path.is_dir(): elif full_path.is_dir():
shutil.rmtree(full_path) shutil.rmtree(full_path)
logger.info(f"delete_item: Directory deleted: {full_path}")
return True return True
async def generate_share_link(self, user_email: str, item_path: str) -> str | None: async def generate_share_link(self, user_email: str, item_path: str) -> str | None:
"""Generates a shareable link for a file or folder.""" """Generates a shareable link for a file or folder."""
logger.debug(f"generate_share_link: Generating link for user '{user_email}', item '{item_path}'")
users_data = await self._load_users_data() users_data = await self._load_users_data()
user = users_data.get(user_email) user = next((u for u in users_data if u.get("email") == user_email), None)
if not user: if not user:
logger.warning(f"generate_share_link: User not found: {user_email}")
return None return None
full_path = self._get_user_file_path(user_email, item_path) full_path = self._get_user_file_path(user_email, item_path)
if not full_path.exists(): if not full_path.exists():
logger.warning(f"generate_share_link: Item does not exist: {full_path}")
return None return None
share_id = str(uuid.uuid4()) share_id = str(uuid.uuid4())
@ -107,37 +139,60 @@ class FileService:
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry "expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat(), # 7-day expiry
} }
await self._save_users_data(users_data) await self._save_users_data(users_data)
logger.info(f"generate_share_link: Share link generated with ID: {share_id} for item: {item_path}")
return share_id return share_id
async def get_shared_item(self, share_id: str) -> dict | None: async def get_shared_item(self, share_id: str) -> dict | None:
"""Retrieves information about a shared item.""" """Retrieves information about a shared item."""
logger.debug(f"get_shared_item: Retrieving shared item with ID: {share_id}")
users_data = await self._load_users_data() users_data = await self._load_users_data()
for user_email, user_info in users_data.items(): for user_info in users_data:
if "shared_items" in user_info and share_id in user_info["shared_items"]: if "shared_items" in user_info and share_id in user_info["shared_items"]:
shared_item = user_info["shared_items"][share_id] shared_item = user_info["shared_items"][share_id]
expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"]) expiry_time = datetime.datetime.fromisoformat(shared_item["expires_at"])
if expiry_time > datetime.datetime.now(datetime.timezone.utc): if expiry_time > datetime.datetime.now(datetime.timezone.utc):
logger.info(f"get_shared_item: Found valid shared item for ID: {share_id}")
return shared_item return shared_item
else:
logger.warning(f"get_shared_item: Shared item {share_id} has expired.")
logger.warning(f"get_shared_item: No valid shared item found for ID: {share_id}")
return None return None
async def get_shared_file_content(self, share_id: str) -> tuple[bytes, str] | None: async def get_shared_file_content(self, share_id: str, requested_file_path: str | None = None) -> tuple[bytes, str] | None:
"""Retrieves the content of a shared file.""" """Retrieves the content of a shared file."""
logger.debug(f"get_shared_file_content: Retrieving content for shared file with ID: {share_id}, requested_file_path: {requested_file_path}")
shared_item = await self.get_shared_item(share_id) shared_item = await self.get_shared_item(share_id)
if not shared_item: if not shared_item:
return None return None
user_email = shared_item["user_email"] user_email = shared_item["user_email"]
item_path = shared_item["item_path"] item_path = shared_item["item_path"] # This is the path of the originally shared item (file or folder)
full_path = self._get_user_file_path(user_email, item_path)
if full_path.is_file(): # Construct the full path to the originally shared item
async with aiofiles.open(full_path, mode="rb") as f: full_shared_item_path = self._get_user_file_path(user_email, item_path)
target_file_path = full_shared_item_path
if requested_file_path:
# If a specific file within a shared folder is requested
target_file_path = self._get_user_file_path(user_email, requested_file_path)
# Security check: Ensure the requested file is actually within the shared item's directory
try:
target_file_path.relative_to(full_shared_item_path)
except ValueError:
logger.warning(f"get_shared_file_content: Requested file path '{requested_file_path}' is not within shared item path '{item_path}' for share_id: {share_id}")
return None
if target_file_path.is_file():
async with aiofiles.open(target_file_path, mode="rb") as f:
content = await f.read() content = await f.read()
return content, full_path.name logger.info(f"get_shared_file_content: Successfully read content for shared file: {target_file_path}")
return content, target_file_path.name
logger.warning(f"get_shared_file_content: Shared item path is not a file or does not exist: {target_file_path}")
return None return None
async def get_shared_folder_content(self, share_id: str) -> list | None: async def get_shared_folder_content(self, share_id: str) -> list | None:
"""Retrieves the content of a shared folder.""" """Retrieves the content of a shared folder."""
logger.debug(f"get_shared_folder_content: Retrieving content for shared folder with ID: {share_id}")
shared_item = await self.get_shared_item(share_id) shared_item = await self.get_shared_item(share_id)
if not shared_item: if not shared_item:
return None return None
@ -147,5 +202,7 @@ class FileService:
full_path = self._get_user_file_path(user_email, item_path) full_path = self._get_user_file_path(user_email, item_path)
if full_path.is_dir(): if full_path.is_dir():
logger.info(f"get_shared_folder_content: Listing files for shared folder: {full_path}")
return await self.list_files(user_email, item_path) return await self.list_files(user_email, item_path)
logger.warning(f"get_shared_folder_content: Shared item path is not a directory or does not exist: {full_path}")
return None return None

View File

@ -14,8 +14,11 @@ class UserService:
def _load_users(self) -> List[Dict[str, Any]]: def _load_users(self) -> List[Dict[str, Any]]:
if not self._users_path.exists(): if not self._users_path.exists():
return [] return []
try:
with open(self._users_path, "r") as f: with open(self._users_path, "r") as f:
return json.load(f) return json.load(f)
except json.JSONDecodeError:
return []
def _save_users(self): def _save_users(self):
with open(self._users_path, "w") as f: with open(self._users_path, "w") as f:

View File

@ -65,8 +65,8 @@
} }
.btn-small { .btn-small {
padding: 0.4rem 0.8rem; padding: 0.2rem 0.4rem; /* Reduced padding */
font-size: 0.85rem; font-size: 0.75rem; /* Reduced font size */
border: 1px solid var(--border-color); border: 1px solid var(--border-color);
border-radius: 4px; border-radius: 4px;
background-color: var(--background-color); background-color: var(--background-color);
@ -236,3 +236,107 @@
min-width: 120px; min-width: 120px;
} }
} }
/* Styles for the new upload functionality */
.upload-area {
border: 2px dashed var(--border-color);
border-radius: 8px;
padding: 20px;
text-align: center;
margin-bottom: 20px;
background-color: var(--background-color);
}
.upload-button {
display: inline-block;
padding: 10px 20px;
cursor: pointer;
margin-top: 10px;
}
.selected-files-preview {
margin-top: 20px;
text-align: left;
max-height: 200px;
overflow-y: auto;
border-top: 1px solid var(--border-color);
padding-top: 10px;
}
.file-entry {
display: flex;
align-items: center;
padding: 8px 0;
border-bottom: 1px solid var(--border-color);
}
.file-entry:last-child {
border-bottom: none;
}
.file-entry .file-name {
flex-grow: 1;
margin-right: 10px;
color: var(--text-color);
}
.file-entry .file-size {
color: var(--light-text-color);
font-size: 0.85rem;
}
.file-entry .thumbnail-preview {
width: 40px;
height: 40px;
margin-left: 10px;
border-radius: 4px;
overflow: hidden;
display: flex;
align-items: center;
justify-content: center;
background-color: var(--background-color);
border: 1px solid var(--border-color);
}
.file-entry .thumbnail-preview img {
max-width: 100%;
max-height: 100%;
display: block;
object-fit: contain;
}
.progress-bar-container {
margin-top: 15px;
text-align: left;
border-top: 1px solid var(--border-color);
padding-top: 10px;
}
.progress-bar-container .file-name {
font-weight: bold;
margin-bottom: 5px;
color: var(--text-color);
}
.progress-bar-wrapper {
width: 100%;
background-color: var(--background-color);
border-radius: 5px;
overflow: hidden;
height: 10px;
margin-bottom: 5px;
}
.progress-bar {
height: 100%;
width: 0%;
background-color: var(--accent-color);
border-radius: 5px;
transition: width 0.3s ease-in-out;
}
.progress-text {
font-size: 0.85rem;
color: var(--light-text-color);
text-align: right;
}

View File

@ -0,0 +1,116 @@
export function showUploadModal() {
document.getElementById('upload-modal').style.display = 'block';
// Clear previous selections and progress
document.getElementById('selected-files-preview').innerHTML = '';
document.getElementById('upload-progress-container').innerHTML = '';
document.getElementById('file-input-multiple').value = ''; // Clear selected files
document.getElementById('start-upload-btn').disabled = true;
}
document.addEventListener('DOMContentLoaded', () => {
const fileInput = document.getElementById('file-input-multiple');
const selectedFilesPreview = document.getElementById('selected-files-preview');
const startUploadBtn = document.getElementById('start-upload-btn');
const uploadProgressContainer = document.getElementById('upload-progress-container');
let filesToUpload = [];
fileInput.addEventListener('change', (event) => {
filesToUpload = Array.from(event.target.files);
selectedFilesPreview.innerHTML = ''; // Clear previous previews
uploadProgressContainer.innerHTML = ''; // Clear previous progress bars
if (filesToUpload.length > 0) {
startUploadBtn.disabled = false;
filesToUpload.forEach(file => {
const fileEntry = document.createElement('div');
fileEntry.className = 'file-entry';
fileEntry.innerHTML = `
<span class="file-name">${file.name}</span>
<span class="file-size">(${(file.size / 1024 / 1024).toFixed(2)} MB)</span>
<div class="thumbnail-preview"></div>
`;
selectedFilesPreview.appendChild(fileEntry);
// Display thumbnail for image files
if (file.type.startsWith('image/')) {
const reader = new FileReader();
reader.onload = (e) => {
const img = document.createElement('img');
img.src = e.target.result;
fileEntry.querySelector('.thumbnail-preview').appendChild(img);
};
reader.readAsDataURL(file);
}
});
} else {
startUploadBtn.disabled = true;
}
});
startUploadBtn.addEventListener('click', () => {
if (filesToUpload.length > 0) {
uploadFiles(filesToUpload);
}
});
async function uploadFiles(files) {
startUploadBtn.disabled = true; // Disable button during upload
uploadProgressContainer.innerHTML = ''; // Clear previous progress
const currentPath = new URLSearchParams(window.location.search).get('path') || '';
for (const file of files) {
const formData = new FormData();
formData.append('file', file);
const progressBarContainer = document.createElement('div');
progressBarContainer.className = 'progress-bar-container';
progressBarContainer.innerHTML = `
<div class="file-name">${file.name}</div>
<div class="progress-bar-wrapper">
<div class="progress-bar" id="progress-${file.name.replace(/\./g, '-')}" style="width: 0%;"></div>
</div>
<div class="progress-text" id="progress-text-${file.name.replace(/\./g, '-')}" >0%</div>
`;
uploadProgressContainer.appendChild(progressBarContainer);
const xhr = new XMLHttpRequest();
xhr.open('POST', `/files/upload?current_path=${encodeURIComponent(currentPath)}`, true);
xhr.upload.addEventListener('progress', (event) => {
if (event.lengthComputable) {
const percent = (event.loaded / event.total) * 100;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `${percent}%`;
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `${Math.round(percent)}%`;
}
});
xhr.addEventListener('load', () => {
if (xhr.status === 200) {
console.log(`File ${file.name} uploaded successfully.`);
// Update progress to 100% on completion
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.width = `100%`;
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `100% (Done)`;
} else {
console.error(`Error uploading ${file.name}: ${xhr.statusText}`);
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Failed (${xhr.status})`;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
}
});
xhr.addEventListener('error', () => {
console.error(`Network error uploading ${file.name}.`);
document.getElementById(`progress-text-${file.name.replace(/\./g, '-')}`).textContent = `Network Error`;
document.getElementById(`progress-${file.name.replace(/\./g, '-')}`).style.backgroundColor = `red`;
});
xhr.send(formData);
}
// After all files are sent, refresh the page to show new files
// A small delay to allow server to process and update file list
setTimeout(() => {
window.location.reload();
}, 1000);
}
});

View File

@ -1,5 +1,6 @@
import './components/slider.js'; import './components/slider.js';
import './components/navigation.js'; // Assuming navigation.js might be needed globally import './components/navigation.js'; // Assuming navigation.js might be needed globally
import { showUploadModal } from './components/upload.js'; // Import showUploadModal
document.addEventListener('DOMContentLoaded', () => { document.addEventListener('DOMContentLoaded', () => {
// Logic for custom-slider on order page // Logic for custom-slider on order page
@ -82,4 +83,262 @@ document.addEventListener('DOMContentLoaded', () => {
// Initial display based on active button (default to monthly) // Initial display based on active button (default to monthly)
updatePricingDisplay('monthly'); updatePricingDisplay('monthly');
} }
// --- File Browser Specific Logic ---
// Helper functions for modals
function showNewFolderModal() {
document.getElementById('new-folder-modal').style.display = 'block';
}
function closeModal(modalId) {
document.getElementById(modalId).style.display = 'none';
}
window.closeModal = closeModal; // Make it globally accessible
window.onclick = function(event) {
if (event.target.classList.contains('modal')) {
event.target.style.display = 'none';
}
}
// File actions
function downloadFile(path) {
window.location.href = `/files/download/${path}`;
}
async function shareFile(paths, names) {
const modal = document.getElementById('share-modal');
const linkContainer = document.getElementById('share-link-container');
const loading = document.getElementById('share-loading');
const shareLinkInput = document.getElementById('share-link-input');
const shareFileName = document.getElementById('share-file-name');
const shareLinksList = document.getElementById('share-links-list'); // New element for multiple links
// Clear previous content
shareLinkInput.value = '';
if (shareLinksList) shareLinksList.innerHTML = '';
linkContainer.style.display = 'none';
loading.style.display = 'block';
modal.style.display = 'block';
if (paths.length === 1) {
shareFileName.textContent = `Sharing: ${names[0]}`;
} else {
shareFileName.textContent = `Sharing ${paths.length} items`;
}
try {
const response = await fetch(`/files/share_multiple`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ paths: paths })
});
const data = await response.json();
if (data.share_links && data.share_links.length > 0) {
if (data.share_links.length === 1) {
shareLinkInput.value = data.share_links[0];
linkContainer.style.display = 'block';
} else {
// Display multiple links
if (!shareLinksList) {
// Create the list if it doesn't exist
const ul = document.createElement('ul');
ul.id = 'share-links-list';
linkContainer.appendChild(ul);
shareLinksList = ul;
}
data.share_links.forEach(item => {
const li = document.createElement('li');
li.innerHTML = `<strong>${item.name}:</strong> <input type="text" value="${item.link}" readonly class="form-input share-link-item-input"> <button class="btn-primary copy-share-link-item-btn" data-link="${item.link}">Copy</button>`;
shareLinksList.appendChild(li);
});
linkContainer.style.display = 'block';
}
loading.style.display = 'none';
} else {
loading.textContent = 'Error generating share link(s)';
}
} catch (error) {
console.error('Error sharing files:', error);
loading.textContent = 'Error generating share link(s)';
}
}
function copyShareLink() {
const input = document.getElementById('share-link-input');
input.select();
document.execCommand('copy');
alert('Share link copied to clipboard');
}
function deleteFile(paths, names) {
const deleteForm = document.getElementById('delete-form');
const deleteMessage = document.getElementById('delete-message');
const deleteModal = document.getElementById('delete-modal');
// Clear previous hidden inputs
deleteForm.querySelectorAll('input[name="paths[]"]').forEach(input => input.remove());
if (Array.isArray(paths) && paths.length > 1) {
deleteMessage.textContent = `Are you sure you want to delete ${paths.length} items? This action cannot be undone.`;
deleteForm.action = `/files/delete_multiple`;
paths.forEach(path => {
const input = document.createElement('input');
input.type = 'hidden';
input.name = 'paths[]';
input.value = path;
deleteForm.appendChild(input);
});
} else {
const path = Array.isArray(paths) ? paths[0] : paths;
const name = Array.isArray(names) ? names[0] : names;
deleteMessage.textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
deleteForm.action = `/files/delete/${path}`;
}
deleteModal.style.display = 'block';
}
// Selection and action buttons
function updateActionButtons() {
const checked = document.querySelectorAll('.file-checkbox:checked');
const downloadBtn = document.getElementById('download-selected-btn');
const shareBtn = document.getElementById('share-selected-btn');
const deleteBtn = document.getElementById('delete-selected-btn');
const hasSelection = checked.length > 0;
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
if (downloadBtn) downloadBtn.disabled = !hasFiles;
if (shareBtn) shareBtn.disabled = !hasSelection;
if (deleteBtn) deleteBtn.disabled = !hasSelection;
}
function downloadSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
downloadFile(checked[0].dataset.path);
}
}
function shareSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
shareFile(paths, names);
}
}
function deleteSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
deleteFile(paths, names);
}
}
// Event Listeners for File Browser
const newFolderBtn = document.getElementById('new-folder-btn');
if (newFolderBtn) {
console.log('Attaching event listener to new-folder-btn');
newFolderBtn.addEventListener('click', showNewFolderModal);
}
const uploadBtn = document.getElementById('upload-btn');
if (uploadBtn) {
console.log('Attaching event listener to upload-btn');
uploadBtn.addEventListener('click', showUploadModal);
}
const createFirstFolderBtn = document.getElementById('create-first-folder-btn');
if (createFirstFolderBtn) {
console.log('Attaching event listener to create-first-folder-btn');
createFirstFolderBtn.addEventListener('click', showNewFolderModal);
}
const uploadFirstFileBtn = document.getElementById('upload-first-file-btn');
if (uploadFirstFileBtn) {
console.log('Attaching event listener to upload-first-file-btn');
uploadFirstFileBtn.addEventListener('click', showUploadModal);
}
const downloadSelectedBtn = document.getElementById('download-selected-btn');
if (downloadSelectedBtn) {
downloadSelectedBtn.addEventListener('click', downloadSelected);
}
const shareSelectedBtn = document.getElementById('share-selected-btn');
if (shareSelectedBtn) {
shareSelectedBtn.addEventListener('click', shareSelected);
}
const deleteSelectedBtn = document.getElementById('delete-selected-btn');
if (deleteSelectedBtn) {
deleteSelectedBtn.addEventListener('click', deleteSelected);
}
const copyShareLinkBtn = document.getElementById('copy-share-link-btn');
if (copyShareLinkBtn) {
copyShareLinkBtn.addEventListener('click', copyShareLink);
}
document.getElementById('select-all')?.addEventListener('change', function(e) {
const checkboxes = document.querySelectorAll('.file-checkbox');
checkboxes.forEach(cb => cb.checked = e.target.checked);
updateActionButtons();
});
document.querySelectorAll('.file-checkbox').forEach(cb => {
cb.addEventListener('change', updateActionButtons);
});
// Event listeners for dynamically created download/share/delete buttons
document.addEventListener('click', (event) => {
if (event.target.classList.contains('download-file-btn')) {
downloadFile(event.target.dataset.path);
} else if (event.target.classList.contains('share-file-btn')) {
const path = event.target.dataset.path;
const name = event.target.dataset.name;
shareFile([path], [name]);
} else if (event.target.classList.contains('delete-file-btn')) {
const path = event.target.dataset.path;
const name = event.target.dataset.name;
deleteFile([path], [name]);
} else if (event.target.classList.contains('copy-share-link-item-btn')) {
const linkToCopy = event.target.dataset.link;
navigator.clipboard.writeText(linkToCopy).then(() => {
alert('Share link copied to clipboard');
}).catch(err => {
console.error('Failed to copy link: ', err);
alert('Failed to copy link');
});
}
});
document.getElementById('search-bar')?.addEventListener('input', function(e) {
const searchTerm = e.target.value.toLowerCase();
const rows = document.querySelectorAll('#file-list-body tr');
rows.forEach(row => {
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
if (name && name.includes(searchTerm)) {
row.style.display = '';
} else {
row.style.display = 'none';
}
});
});
// Initial update of action buttons
updateActionButtons();
}); });

View File

@ -9,11 +9,11 @@
{% block page_title %}My Files{% endblock %} {% block page_title %}My Files{% endblock %}
{% block dashboard_actions %} {% block dashboard_actions %}
<button class="btn-primary" onclick="showNewFolderModal()">+ New</button> <button class="btn-primary" id="new-folder-btn">+ New</button>
<button class="btn-outline" onclick="showUploadModal()">Upload</button> <button class="btn-outline" id="upload-btn">Upload</button>
<button class="btn-outline" onclick="downloadSelected()" id="download-btn" disabled>Download</button> <button class="btn-outline" id="download-selected-btn" disabled>⬇️</button>
<button class="btn-outline" onclick="shareSelected()" id="share-btn" disabled>Share</button> <button class="btn-outline" id="share-selected-btn" disabled>đź”—</button>
<button class="btn-outline" onclick="deleteSelected()" id="delete-btn" disabled>Delete</button> <button class="btn-outline" id="delete-selected-btn" disabled>Delete</button>
{% endblock %} {% endblock %}
{% block dashboard_content %} {% block dashboard_content %}
@ -70,10 +70,10 @@
<td> <td>
<div class="action-buttons"> <div class="action-buttons">
{% if not item.is_dir %} {% if not item.is_dir %}
<button class="btn-small" onclick="downloadFile('{{ item.path }}')">Download</button> <button class="btn-small download-file-btn" data-path="{{ item.path }}">⬇️</button>
{% endif %} {% endif %}
<button class="btn-small" onclick="shareFile('{{ item.path }}', '{{ item.name }}')">Share</button> <button class="btn-small share-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">đź”—</button>
<button class="btn-small btn-danger" onclick="deleteFile('{{ item.path }}', '{{ item.name }}')">Delete</button> <button class="btn-small btn-danger delete-file-btn" data-path="{{ item.path }}" data-name="{{ item.name }}">Delete</button>
</div> </div>
</td> </td>
</tr> </tr>
@ -82,8 +82,8 @@
<tr> <tr>
<td colspan="6" style="text-align: center; padding: 40px;"> <td colspan="6" style="text-align: center; padding: 40px;">
<p>No files found in this directory.</p> <p>No files found in this directory.</p>
<button class="btn-primary" onclick="showNewFolderModal()" style="margin-top: 10px;">Create your first folder</button> <button class="btn-primary" id="create-first-folder-btn" style="margin-top: 10px;">Create your first folder</button>
<button class="btn-outline" onclick="showUploadModal()" style="margin-top: 10px;">Upload a file</button> <button class="btn-outline" id="upload-first-file-btn" style="margin-top: 10px;">Upload a file</button>
</td> </td>
</tr> </tr>
{% endif %} {% endif %}
@ -108,15 +108,17 @@
<div id="upload-modal" class="modal"> <div id="upload-modal" class="modal">
<div class="modal-content"> <div class="modal-content">
<span class="close" onclick="closeModal('upload-modal')">&times;</span> <span class="close" onclick="closeModal('upload-modal')">&times;</span>
<h3>Upload File</h3> <h3>Upload Files</h3>
<form action="/files/upload" method="post" enctype="multipart/form-data"> <div class="upload-area">
<input type="file" name="file" required class="form-input" id="file-input"> <input type="file" name="file" multiple class="form-input" id="file-input-multiple" style="display: none;">
<div class="file-info" id="file-info"></div> <label for="file-input-multiple" class="btn-outline upload-button">Select Files</label>
<div id="selected-files-preview" class="selected-files-preview"></div>
<div id="upload-progress-container" class="upload-progress-container"></div>
</div>
<div class="modal-actions"> <div class="modal-actions">
<button type="submit" class="btn-primary">Upload</button> <button type="button" class="btn-primary" id="start-upload-btn" disabled>Upload</button>
<button type="button" class="btn-outline" onclick="closeModal('upload-modal')">Cancel</button> <button type="button" class="btn-outline" onclick="closeModal('upload-modal')">Cancel</button>
</div> </div>
</form>
</div> </div>
</div> </div>
@ -127,7 +129,8 @@
<p id="share-file-name"></p> <p id="share-file-name"></p>
<div id="share-link-container" style="display: none;"> <div id="share-link-container" style="display: none;">
<input type="text" id="share-link-input" readonly class="form-input"> <input type="text" id="share-link-input" readonly class="form-input">
<button class="btn-primary" onclick="copyShareLink()">Copy Link</button> <button class="btn-primary" id="copy-share-link-btn">Copy Link</button>
<div id="share-links-list" class="share-links-list"></div>
</div> </div>
<div id="share-loading">Generating share link...</div> <div id="share-loading">Generating share link...</div>
<div class="modal-actions"> <div class="modal-actions">
@ -150,146 +153,6 @@
</div> </div>
</div> </div>
<script> <script type="module" src="/static/js/components/upload.js"></script>
function showNewFolderModal() { <script type="module" src="/static/js/main.js"></script>
document.getElementById('new-folder-modal').style.display = 'block';
}
function showUploadModal() {
document.getElementById('upload-modal').style.display = 'block';
}
function closeModal(modalId) {
document.getElementById(modalId).style.display = 'none';
}
window.onclick = function(event) {
if (event.target.classList.contains('modal')) {
event.target.style.display = 'none';
}
}
document.getElementById('file-input').addEventListener('change', function(e) {
const file = e.target.files[0];
if (file) {
const size = (file.size / 1024 / 1024).toFixed(2);
document.getElementById('file-info').innerHTML = `Selected: ${file.name} (${size} MB)`;
}
});
function downloadFile(path) {
window.location.href = `/files/download/${path}`;
}
async function shareFile(path, name) {
const modal = document.getElementById('share-modal');
const linkContainer = document.getElementById('share-link-container');
const loading = document.getElementById('share-loading');
document.getElementById('share-file-name').textContent = `Sharing: ${name}`;
linkContainer.style.display = 'none';
loading.style.display = 'block';
modal.style.display = 'block';
try {
const response = await fetch(`/files/share/${path}`, {
method: 'POST'
});
const data = await response.json();
if (data.share_link) {
document.getElementById('share-link-input').value = data.share_link;
linkContainer.style.display = 'block';
loading.style.display = 'none';
}
} catch (error) {
loading.textContent = 'Error generating share link';
}
}
function copyShareLink() {
const input = document.getElementById('share-link-input');
input.select();
document.execCommand('copy');
alert('Share link copied to clipboard');
}
function deleteFile(path, name) {
document.getElementById('delete-message').textContent = `Are you sure you want to delete "${name}"? This action cannot be undone.`;
document.getElementById('delete-form').action = `/files/delete/${path}`;
document.getElementById('delete-modal').style.display = 'block';
}
document.getElementById('select-all').addEventListener('change', function(e) {
const checkboxes = document.querySelectorAll('.file-checkbox');
checkboxes.forEach(cb => cb.checked = e.target.checked);
updateActionButtons();
});
document.querySelectorAll('.file-checkbox').forEach(cb => {
cb.addEventListener('change', updateActionButtons);
});
function updateActionButtons() {
const checked = document.querySelectorAll('.file-checkbox:checked');
const downloadBtn = document.getElementById('download-btn');
const shareBtn = document.getElementById('share-btn');
const deleteBtn = document.getElementById('delete-btn');
const hasSelection = checked.length > 0;
const hasFiles = Array.from(checked).some(cb => cb.dataset.isDir === 'False');
downloadBtn.disabled = !hasFiles;
shareBtn.disabled = !hasSelection;
deleteBtn.disabled = !hasSelection;
}
function downloadSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1 && checked[0].dataset.isDir === 'False') {
downloadFile(checked[0].dataset.path);
}
}
function shareSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length === 1) {
const path = checked[0].dataset.path;
const name = checked[0].closest('tr').querySelector('td:nth-child(2)').textContent.trim();
shareFile(path, name);
}
}
function deleteSelected() {
const checked = document.querySelectorAll('.file-checkbox:checked');
if (checked.length > 0) {
const paths = Array.from(checked).map(cb => cb.dataset.path);
const names = Array.from(checked).map(cb =>
cb.closest('tr').querySelector('td:nth-child(2)').textContent.trim()
);
if (checked.length === 1) {
deleteFile(paths[0], names[0]);
} else {
document.getElementById('delete-message').textContent =
`Are you sure you want to delete ${checked.length} items? This action cannot be undone.`;
document.getElementById('delete-modal').style.display = 'block';
}
}
}
document.getElementById('search-bar').addEventListener('input', function(e) {
const searchTerm = e.target.value.toLowerCase();
const rows = document.querySelectorAll('#file-list-body tr');
rows.forEach(row => {
const name = row.querySelector('td:nth-child(2)')?.textContent.toLowerCase();
if (name && name.includes(searchTerm)) {
row.style.display = '';
} else {
row.style.display = 'none';
}
});
});
</script>
{% endblock %} {% endblock %}

View File

@ -0,0 +1,64 @@
{% extends "layouts/base.html" %}
{% block title %}Shared Folder - Retoor's Cloud Solutions{% endblock %}
{% block head %}
<link rel="stylesheet" href="/static/css/components/file_browser.css">
{% endblock %}
{% block content %}
<div class="container dashboard-container">
<h1 class="page-title">Shared Folder</h1>
<div class="file-list-table">
<table>
<thead>
<tr>
<th>Name</th>
<th>Last Modified</th>
<th>Size</th>
<th>Actions</th>
</tr>
</thead>
<tbody id="file-list-body">
{% if files %}
{% for item in files %}
<tr data-path="{{ item.path }}" data-is-dir="{{ item.is_dir }}">
<td>
{% if item.is_dir %}
<img src="/static/images/icon-families.svg" alt="Folder Icon" class="file-icon">
<a href="/shared_file/{{ share_id }}?path={{ item.path }}">{{ item.name }}</a>
{% else %}
<img src="/static/images/icon-professionals.svg" alt="File Icon" class="file-icon">
{{ item.name }}
{% endif %}
</td>
<td>{{ item.last_modified[:10] }}</td>
<td>
{% if item.is_dir %}
--
{% else %}
{{ (item.size / 1024 / 1024)|round(2) }} MB
{% endif %}
</td>
<td>
<div class="action-buttons">
{% if not item.is_dir %}
<a href="/shared_file/{{ share_id }}/download?file_path={{ item.path }}" class="btn-small">⬇️</a>
{% endif %}
</div>
</td>
</tr>
{% endfor %}
{% else %}
<tr>
<td colspan="4" style="text-align: center; padding: 40px;">
<p>No files found in this shared directory.</p>
</td>
</tr>
{% endif %}
</tbody>
</table>
</div>
</div>
{% endblock %}

View File

@ -1,3 +1,4 @@
import logging
from aiohttp import web from aiohttp import web
import aiohttp_jinja2 import aiohttp_jinja2
import os import os
@ -7,6 +8,14 @@ from aiohttp.web_response import json_response
from ..helpers.auth import login_required from ..helpers.auth import login_required
from .auth import CustomPydanticView from .auth import CustomPydanticView
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# PROJECT_DIR is no longer directly used for user files, as FileService manages them # PROJECT_DIR is no longer directly used for user files, as FileService manages them
# PROJECT_DIR = Path(__file__).parent.parent.parent / "project" # PROJECT_DIR = Path(__file__).parent.parent.parent / "project"
@ -45,7 +54,7 @@ class SiteView(web.View):
@login_required @login_required
async def dashboard(self): async def dashboard(self):
return web.HTTPFound(self.request.app.router["file_browser"].url_for()) raise web.HTTPFound(self.request.app.router["file_browser"].url_for())
async def solutions(self): async def solutions(self):
return aiohttp_jinja2.render_template( return aiohttp_jinja2.render_template(
@ -126,9 +135,6 @@ class SiteView(web.View):
class FileBrowserView(web.View): class FileBrowserView(web.View):
@login_required @login_required
async def get(self): async def get(self):
if self.request.match_info.get("file_path") is not None:
return await self.get_download_file()
user_email = self.request["user"]["email"] user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"] file_service = self.request.app["file_service"]
@ -157,12 +163,14 @@ class FileBrowserView(web.View):
user_email = self.request["user"]["email"] user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"] file_service = self.request.app["file_service"]
route_name = self.request.match_info.route.name route_name = self.request.match_info.route.name
logger.debug(f"FileBrowserView: POST request for route: {route_name}")
if route_name == "new_folder": if route_name == "new_folder":
data = await self.request.post() data = await self.request.post()
folder_name = data.get("folder_name") folder_name = data.get("folder_name")
if not folder_name: if not folder_name:
return web.HTTPFound( logger.warning("FileBrowserView: New folder request missing folder_name")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
error="Folder name is required" error="Folder name is required"
) )
@ -170,74 +178,42 @@ class FileBrowserView(web.View):
success = await file_service.create_folder(user_email, folder_name) success = await file_service.create_folder(user_email, folder_name)
if success: if success:
return web.HTTPFound( logger.info(f"FileBrowserView: Folder '{folder_name}' created successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
success=f"Folder '{folder_name}' created successfully" success=f"Folder '{folder_name}' created successfully"
) )
) )
else: else:
return web.HTTPFound( logger.error(f"FileBrowserView: Failed to create folder '{folder_name}' for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
error=f"Folder '{folder_name}' already exists or could not be created" error=f"Folder '{folder_name}' already exists or could not be created"
) )
) )
elif route_name == "upload_file":
try:
reader = await self.request.multipart()
field = await reader.next()
if not field or field.name != "file":
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="No file selected for upload"
)
)
filename = field.filename
if not filename:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Filename is required"
)
)
content = await field.read()
success = await file_service.upload_file(user_email, filename, content)
if success:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"File '{filename}' uploaded successfully"
)
)
else:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error=f"Failed to upload file '{filename}'"
)
)
except Exception as e:
return web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error=f"Upload error: {str(e)}"
)
)
elif route_name == "share_file": elif route_name == "share_file":
file_path = self.request.match_info.get("file_path") file_path = self.request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Share file request for path: {file_path} by user {user_email}")
if not file_path: if not file_path:
logger.warning("FileBrowserView: Share file request missing file_path")
return json_response({"error": "File path is required for sharing"}, status=400) return json_response({"error": "File path is required for sharing"}, status=400)
share_id = await file_service.generate_share_link(user_email, file_path) share_id = await file_service.generate_share_link(user_email, file_path)
if share_id: if share_id:
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}" share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
logger.info(f"FileBrowserView: Share link generated: {share_link} for file: {file_path}")
return json_response({"share_link": share_link}) return json_response({"share_link": share_link})
else: else:
logger.error(f"FileBrowserView: Failed to generate share link for file: {file_path} by user {user_email}")
return json_response({"error": "Failed to generate share link"}, status=500) return json_response({"error": "Failed to generate share link"}, status=500)
elif route_name == "delete_item": elif route_name == "delete_item":
item_path = self.request.match_info.get("file_path") item_path = self.request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Delete item request for path: {item_path} by user {user_email}")
if not item_path: if not item_path:
return web.HTTPFound( logger.warning("FileBrowserView: Delete item request missing item_path")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
error="Item path is required for deletion" error="Item path is required for deletion"
) )
@ -245,38 +221,199 @@ class FileBrowserView(web.View):
success = await file_service.delete_item(user_email, item_path) success = await file_service.delete_item(user_email, item_path)
if success: if success:
return web.HTTPFound( logger.info(f"FileBrowserView: Item '{item_path}' deleted successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
success=f"Item deleted successfully" success=f"Item deleted successfully"
) )
) )
else: else:
return web.HTTPFound( logger.error(f"FileBrowserView: Failed to delete item '{item_path}' for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query( self.request.app.router["file_browser"].url_for().with_query(
error="Failed to delete item - it may not exist" error="Failed to delete item - it may not exist"
) )
) )
return web.HTTPBadRequest(text="Unknown file action") elif route_name == "delete_multiple_items":
data = await self.request.post()
paths = data.getall("paths[]", [])
logger.debug(f"FileBrowserView: Delete multiple items request for paths: {paths} by user {user_email}")
if not paths:
logger.warning("FileBrowserView: Delete multiple items request missing paths")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="No items selected for deletion"
)
)
all_successful = True
for path in paths:
success = await file_service.delete_item(user_email, path)
if not success:
all_successful = False
logger.error(f"FileBrowserView: Failed to delete item '{path}' for user {user_email} during bulk delete")
if all_successful:
logger.info(f"FileBrowserView: All {len(paths)} items deleted successfully for user {user_email}")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
success=f"Successfully deleted {len(paths)} items"
)
)
else:
logger.error(f"FileBrowserView: Some items failed to delete for user {user_email} during bulk delete")
raise web.HTTPFound(
self.request.app.router["file_browser"].url_for().with_query(
error="Some items failed to delete"
)
)
elif route_name == "share_multiple_items":
data = await self.request.json()
paths = data.get("paths", [])
logger.debug(f"FileBrowserView: Share multiple items request for paths: {paths} by user {user_email}")
if not paths:
logger.warning("FileBrowserView: Share multiple items request missing paths")
return json_response({"error": "No items selected for sharing"}, status=400)
share_links = []
for path in paths:
share_id = await file_service.generate_share_link(user_email, path)
if share_id:
share_link = f"{self.request.scheme}://{self.request.host}/shared_file/{share_id}"
# Extract file name from path
file_name = os.path.basename(path)
share_links.append({"name": file_name, "link": share_link})
else:
logger.error(f"FileBrowserView: Failed to generate share link for file: {path} by user {user_email}")
if share_links:
logger.info(f"FileBrowserView: Generated {len(share_links)} share links for user {user_email}")
return json_response({"share_links": share_links})
else:
logger.error(f"FileBrowserView: Failed to generate any share links for user {user_email}")
return json_response({"error": "Failed to generate share links for any selected items"}, status=500)
logger.warning(f"FileBrowserView: Unknown file action for POST request: {route_name}")
return web.Response(status=400, text="Unknown file action")
@login_required @login_required
async def get_download_file(self): async def get_download_file(self):
user_email = self.request["user"]["email"] request = self # self is the request object here
file_service = self.request.app["file_service"] user_email = request["user"]["email"]
file_path = self.request.match_info.get("file_path") file_service = request.app["file_service"]
file_path = request.match_info.get("file_path")
logger.debug(f"FileBrowserView: Download file request for path: {file_path} by user {user_email}")
if not file_path: if not file_path:
return web.HTTPBadRequest(text="File path is required for download") logger.warning("FileBrowserView: Download file request missing file_path")
raise web.HTTPBadRequest(text="File path is required for download")
result = await file_service.download_file(user_email, file_path) result = await file_service.download_file(user_email, file_path)
if result: if result:
content, filename = result content, filename = result
response = web.Response(body=content) response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}" response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream" response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: File '{filename}' downloaded successfully by user {user_email}")
return response return response
else: else:
return web.HTTPNotFound(text="File not found") logger.error(f"FileBrowserView: Failed to download file: {file_path} for user {user_email} - file not found or access denied")
raise web.HTTPNotFound(text="File not found")
async def shared_file_handler(self):
share_id = self.request.match_info.get("share_id")
file_service = self.request.app["file_service"]
logger.debug(f"FileBrowserView: Handling shared file request for share_id: {share_id}")
shared_item = await file_service.get_shared_item(share_id)
if not shared_item:
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id}")
return aiohttp_jinja2.render_template(
"pages/errors/404.html",
self.request,
{"request": self.request, "message": "Shared link is invalid or has expired."},
status=404
)
user_email = shared_item["user_email"]
item_path = shared_item["item_path"]
full_path = file_service._get_user_file_path(user_email, item_path)
if full_path.is_file():
result = await file_service.get_shared_file_content(share_id)
if result:
content, filename = result
response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: Serving shared file '{filename}' for share_id: {share_id}")
return response
else:
logger.error(f"FileBrowserView: Failed to get content for shared file: {item_path} (share_id: {share_id})")
raise web.HTTPNotFound(text="Shared file not found or inaccessible")
elif full_path.is_dir():
files = await file_service.get_shared_folder_content(share_id)
logger.info(f"FileBrowserView: Serving shared folder '{item_path}' for share_id: {share_id}")
return aiohttp_jinja2.render_template(
"pages/shared_folder.html",
self.request,
{
"request": self.request,
"files": files,
"current_path": item_path,
"share_id": share_id,
"user_email": user_email,
"active_page": "shared"
}
)
else:
logger.error(f"FileBrowserView: Shared item is neither file nor directory: {item_path} (share_id: {share_id})")
raise web.HTTPNotFound(text="Shared item not found")
async def download_shared_file_handler(self):
share_id = self.request.match_info.get("share_id")
file_path = self.request.query.get("file_path") # This is the path of the file *within* the shared item
file_service = self.request.app["file_service"]
logger.debug(f"FileBrowserView: Handling download shared file request for share_id: {share_id}, file_path: {file_path}")
if not file_path:
logger.warning("FileBrowserView: Download shared file request missing file_path query parameter.")
raise web.HTTPBadRequest(text="File path is required for download from shared folder.")
shared_item = await file_service.get_shared_item(share_id)
if not shared_item:
logger.warning(f"FileBrowserView: Shared item not found or expired for share_id: {share_id} during download.")
return aiohttp_jinja2.render_template(
"pages/errors/404.html",
self.request,
{"request": self.request, "message": "Shared link is invalid or has expired."},
status=404
)
# Ensure the shared item is a directory if a file_path is provided
user_email = shared_item["user_email"]
original_shared_item_path = file_service._get_user_file_path(user_email, shared_item["item_path"])
if not original_shared_item_path.is_dir():
logger.warning(f"FileBrowserView: Attempt to download a specific file from a shared item that is not a directory. Share_id: {share_id}")
raise web.HTTPBadRequest(text="Cannot download specific files from a shared item that is not a folder.")
result = await file_service.get_shared_file_content(share_id, file_path)
if result:
content, filename = result
response = web.Response(body=content)
response.headers["Content-Disposition"] = f"attachment; filename={filename}"
response.headers["Content-Type"] = "application/octet-stream"
logger.info(f"FileBrowserView: Serving shared file '{filename}' from shared folder for share_id: {share_id}")
return response
else:
logger.error(f"FileBrowserView: Failed to get content for shared file: {file_path} (share_id: {share_id}) from shared folder.")
raise web.HTTPNotFound(text="Shared file not found or inaccessible within the shared folder.")
class OrderView(CustomPydanticView): class OrderView(CustomPydanticView):
@ -323,7 +460,7 @@ class UserManagementView(web.View):
elif route_name == "user_details": elif route_name == "user_details":
return await self.user_details_page() return await self.user_details_page()
return web.HTTPNotFound() raise web.HTTPNotFound()
@login_required @login_required
async def post(self): async def post(self):
@ -336,7 +473,7 @@ class UserManagementView(web.View):
elif route_name == "delete_user_page": elif route_name == "delete_user_page":
return await self.delete_user_submit() return await self.delete_user_submit()
return web.HTTPNotFound() raise web.HTTPNotFound()
async def add_user_page(self): async def add_user_page(self):
return aiohttp_jinja2.render_template( return aiohttp_jinja2.render_template(
@ -403,7 +540,7 @@ class UserManagementView(web.View):
user_service.update_user_quota(email, float(storage_quota_gb)) user_service.update_user_quota(email, float(storage_quota_gb))
return web.HTTPFound( raise web.HTTPFound(
self.request.app.router["users"].url_for().with_query( self.request.app.router["users"].url_for().with_query(
success=f"User {email} added successfully" success=f"User {email} added successfully"
) )
@ -429,7 +566,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email) user_data = user_service.get_user_by_email(email)
if not user_data: if not user_data:
return web.HTTPNotFound(text="User not found") raise web.HTTPNotFound(text="User not found")
success_message = self.request.query.get("success") success_message = self.request.query.get("success")
@ -464,7 +601,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email) user_data = user_service.get_user_by_email(email)
if not user_data: if not user_data:
return web.HTTPNotFound(text="User not found") raise web.HTTPNotFound(text="User not found")
if errors: if errors:
return aiohttp_jinja2.render_template( return aiohttp_jinja2.render_template(
@ -481,7 +618,7 @@ class UserManagementView(web.View):
user_service.update_user_quota(email, storage_quota_gb) user_service.update_user_quota(email, storage_quota_gb)
return web.HTTPFound( raise web.HTTPFound(
self.request.app.router["edit_user"].url_for(email=email).with_query( self.request.app.router["edit_user"].url_for(email=email).with_query(
success="User quota updated successfully" success="User quota updated successfully"
) )
@ -494,7 +631,7 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email) user_data = user_service.get_user_by_email(email)
if not user_data: if not user_data:
return web.HTTPNotFound(text="User not found") raise web.HTTPNotFound(text="User not found")
return aiohttp_jinja2.render_template( return aiohttp_jinja2.render_template(
"pages/user_details.html", "pages/user_details.html",
@ -514,11 +651,11 @@ class UserManagementView(web.View):
user_data = user_service.get_user_by_email(email) user_data = user_service.get_user_by_email(email)
if not user_data: if not user_data:
return web.HTTPNotFound(text="User not found") raise web.HTTPNotFound(text="User not found")
user_service.delete_user(email) user_service.delete_user(email)
return web.HTTPFound( raise web.HTTPFound(
self.request.app.router["users"].url_for().with_query( self.request.app.router["users"].url_for().with_query(
success=f"User {email} deleted successfully" success=f"User {email} deleted successfully"
) )

50
retoors/views/upload.py Normal file
View File

@ -0,0 +1,50 @@
from aiohttp import web
import aiohttp_jinja2
from aiohttp.web_response import json_response
from ..helpers.auth import login_required
class UploadView(web.View):
@login_required
async def post(self):
user_email = self.request["user"]["email"]
file_service = self.request.app["file_service"]
# Get current path from query parameter or form data
current_path = self.request.query.get("current_path", "")
try:
reader = await self.request.multipart()
files_uploaded = []
errors = []
while True:
field = await reader.next()
if field is None:
break
# Check if the field is a file input
if field.name == "file": # Assuming the input field name is 'file'
filename = field.filename
if not filename:
errors.append("Filename is required for one of the files.")
continue
content = await field.read()
# Construct the full file path relative to the user's base directory
full_file_path_for_service = f"{current_path}/{filename}" if current_path else filename
success = await file_service.upload_file(user_email, full_file_path_for_service, content)
if success:
files_uploaded.append(filename)
else:
errors.append(f"Failed to upload file '{filename}'")
if errors:
return json_response({"status": "error", "message": "Some files failed to upload", "details": errors}, status=500)
elif files_uploaded:
return json_response({"status": "success", "message": f"Successfully uploaded {len(files_uploaded)} files", "files": files_uploaded})
else:
return json_response({"status": "error", "message": "No files were uploaded"}, status=400)
except Exception as e:
return json_response({"status": "error", "message": f"Upload error: {str(e)}"}, status=500)

View File

@ -5,6 +5,59 @@ from retoors.main import create_app
from retoors.services.config_service import ConfigService from retoors.services.config_service import ConfigService
from pytest_mock import MockerFixture # Import MockerFixture from pytest_mock import MockerFixture # Import MockerFixture
import datetime # Import datetime import datetime # Import datetime
from aiohttp.test_utils import TestClient # Import TestClient
from aiohttp_session import setup as setup_session # Import setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage # Import EncryptedCookieStorage
from retoors.helpers.env_manager import get_or_create_session_secret_key # Import get_or_create_session_secret_key
from retoors.middlewares import user_middleware, error_middleware # Import middlewares
from retoors.services.user_service import UserService # Import UserService
from retoors.routes import setup_routes # Import setup_routes
import aiohttp_jinja2 # Import aiohttp_jinja2
import jinja2 # Import jinja2
import aiohttp # Import aiohttp
from retoors.services.file_service import FileService # Import FileService
@pytest.fixture
def temp_user_files_dir(tmp_path):
"""Fixture to create a temporary directory for user files."""
user_files_dir = tmp_path / "user_files"
user_files_dir.mkdir()
return user_files_dir
@pytest.fixture
def temp_users_json(tmp_path):
"""Fixture to create a temporary users.json file."""
users_json_path = tmp_path / "users.json"
initial_users_data = [
{
"email": "test@example.com",
"full_name": "Test User",
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": None,
"shared_items": {}
},
{
"email": "child@example.com",
"full_name": "Child User",
"email": "child@example.com",
"password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": "test@example.com",
"shared_items": {}
}
]
with open(users_json_path, "w") as f:
json.dump(initial_users_data, f)
return users_json_path
@pytest.fixture
def file_service_instance(temp_user_files_dir, temp_users_json):
"""Fixture to provide a FileService instance with temporary directories."""
return FileService(temp_user_files_dir, temp_users_json)
@pytest.fixture @pytest.fixture
@ -12,6 +65,42 @@ def create_app_instance():
"""Fixture to create a new aiohttp application instance.""" """Fixture to create a new aiohttp application instance."""
return create_app() return create_app()
@pytest.fixture
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance):
"""Fixture to create a test aiohttp application with mocked services."""
from aiohttp import web
app = web.Application()
# Setup session for the test app
project_root = Path(__file__).parent.parent
env_file_path = project_root / ".env"
secret_key = get_or_create_session_secret_key(env_file_path)
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
app.middlewares.append(error_middleware)
app.middlewares.append(user_middleware)
# Mock UserService
mock_user_service = mocker.MagicMock(spec=UserService)
# Mock scheduler
mock_scheduler = mocker.MagicMock()
mock_scheduler.spawn = mocker.AsyncMock()
mock_scheduler.close = mocker.AsyncMock()
app["user_service"] = mock_user_service
app["file_service"] = file_service_instance
app["scheduler"] = mock_scheduler
# Setup Jinja2 for templates
base_path = Path(__file__).parent.parent / "retoors"
templates_path = base_path / "templates"
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
setup_routes(app)
return app
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def mock_users_db_fixture(): def mock_users_db_fixture():
@ -39,9 +128,8 @@ def mock_users_db_fixture():
"storage_quota_gb": 50, "storage_quota_gb": 50,
"storage_used_gb": 5, "storage_used_gb": 5,
"parent_email": "admin@example.com", "parent_email": "admin@example.com",
"reset_token": None, "shared_items": {}
"reset_token_expiry": None, }
},
} }
@ -242,6 +330,118 @@ async def client(
config_file.unlink(missing_ok=True) # Use missing_ok for robustness config_file.unlink(missing_ok=True) # Use missing_ok for robustness
@pytest.fixture
async def logged_in_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
return client
@pytest.fixture
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in admin user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "admin@example.com", "password": "password"}
)
return client
@pytest.fixture @pytest.fixture
def mock_send_email(mocker: MockerFixture): def mock_send_email(mocker: MockerFixture):
""" """

View File

@ -4,6 +4,7 @@ from pathlib import Path
import json import json
import datetime import datetime
import aiohttp import aiohttp
from aiohttp import web
from aiohttp.test_utils import TestClient from aiohttp.test_utils import TestClient
from aiohttp_session import setup as setup_session from aiohttp_session import setup as setup_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage from aiohttp_session.cookie_storage import EncryptedCookieStorage
@ -12,157 +13,9 @@ from retoors.helpers.env_manager import get_or_create_session_secret_key
# Assuming the FileService is in retoors/services/file_service.py # Assuming the FileService is in retoors/services/file_service.py
# and the FileBrowserView is in retoors/views/site.py # and the FileBrowserView is in retoors/views/site.py
@pytest.fixture
def temp_user_files_dir(tmp_path):
"""Fixture to create a temporary directory for user files."""
user_files_dir = tmp_path / "user_files"
user_files_dir.mkdir()
return user_files_dir
@pytest.fixture
def temp_users_json(tmp_path):
"""Fixture to create a temporary users.json file."""
users_json_path = tmp_path / "users.json"
initial_users_data = {
"test@example.com": {
"full_name": "Test User",
"email": "test@example.com",
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": None,
"shared_items": {}
},
"child@example.com": {
"full_name": "Child User",
"email": "child@example.com",
"password": "hashed_password",
"storage_quota_gb": 5,
"storage_used_gb": 0,
"parent_email": "test@example.com",
"shared_items": {}
}
}
with open(users_json_path, "w") as f:
json.dump(initial_users_data, f)
return users_json_path
@pytest.fixture
def file_service_instance(temp_user_files_dir, temp_users_json):
"""Fixture to provide a FileService instance with temporary directories."""
from retoors.services.file_service import FileService
return FileService(temp_user_files_dir, temp_users_json)
@pytest.fixture
async def logged_in_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 10,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Test User",
"is_admin": False,
"storage_quota_gb": 10,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Test User",
"email": "test@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "test@example.com", "password": "password"}
)
return client
@pytest.fixture
async def logged_in_admin_client(aiohttp_client, create_test_app, mocker):
"""Fixture to provide an aiohttp client with a logged-in admin user."""
app = create_test_app
client = await aiohttp_client(app)
user_service = app["user_service"]
def mock_create_user(full_name, email, password, parent_email=None):
return {
"full_name": full_name,
"email": email,
"password": "hashed_password",
"storage_quota_gb": 100,
"storage_used_gb": 0,
"parent_email": parent_email,
"shared_items": {}
}
def mock_authenticate_user(email, password):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
def mock_get_user_by_email(email):
return {
"email": email,
"full_name": "Admin User",
"is_admin": True,
"storage_quota_gb": 100,
"storage_used_gb": 0
}
mocker.patch.object(user_service, "create_user", side_effect=mock_create_user)
mocker.patch.object(user_service, "authenticate_user", side_effect=mock_authenticate_user)
mocker.patch.object(user_service, "get_user_by_email", side_effect=mock_get_user_by_email)
await client.post(
"/register",
data={
"full_name": "Admin User",
"email": "admin@example.com",
"password": "password",
"confirm_password": "password",
},
)
await client.post(
"/login", data={"email": "admin@example.com", "password": "password"}
)
return client
# --- FileService Tests --- # --- FileService Tests ---
@ -310,10 +163,10 @@ async def test_file_service_shared_link_expiry(file_service_instance, mocker):
await file_service_instance.upload_file(user_email, file_path, b"expiring content") await file_service_instance.upload_file(user_email, file_path, b"expiring content")
share_id = await file_service_instance.generate_share_link(user_email, file_path) share_id = await file_service_instance.generate_share_link(user_email, file_path)
# Mock datetime to simulate an expired link # Mock datetime to simulate an expired link (after generating the link)
future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8) future_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=8)
mocker.patch("datetime.datetime", MagicMock(wraps=datetime.datetime)) mock_datetime = mocker.patch('retoors.services.file_service.datetime', wraps=datetime)
datetime.datetime.now.return_value = future_time mock_datetime.datetime.now = mocker.Mock(return_value=future_time)
shared_item = await file_service_instance.get_shared_item(share_id) shared_item = await file_service_instance.get_shared_item(share_id)
assert shared_item is None assert shared_item is None
@ -355,6 +208,25 @@ async def test_file_browser_new_folder(logged_in_client: TestClient, file_servic
expected_path = temp_user_files_dir / user_email / "new_folder_via_web" expected_path = temp_user_files_dir / user_email / "new_folder_via_web"
assert expected_path.is_dir() assert expected_path.is_dir()
@pytest.mark.asyncio
async def test_file_browser_new_folder_missing_name(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": ""}, allow_redirects=False)
assert resp.status == 302
assert "error=Folder+name+is+required" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_new_folder_exists(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "existing_folder_web"
await file_service_instance.create_folder(user_email, folder_name) # Create it first
# Mock create_folder to return False, simulating it already exists or failed
mocker.patch.object(file_service_instance, "create_folder", return_value=False)
resp = await logged_in_client.post("/files/new_folder", data={"folder_name": folder_name}, allow_redirects=False)
assert resp.status == 302
assert f"error=Folder+'{folder_name}'+already+exists+or+could+not+be+created" in resp.headers["Location"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): async def test_file_browser_upload_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com" user_email = "test@example.com"
@ -368,8 +240,7 @@ async def test_file_browser_upload_file(logged_in_client: TestClient, file_servi
content_type='text/plain') content_type='text/plain')
resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False) resp = await logged_in_client.post("/files/upload", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect assert resp.status == 200
assert resp.headers["Location"].startswith("/files")
expected_path = temp_user_files_dir / user_email / "uploaded.txt" expected_path = temp_user_files_dir / user_email / "uploaded.txt"
assert expected_path.is_file() assert expected_path.is_file()
@ -389,8 +260,9 @@ async def test_file_browser_download_file(logged_in_client: TestClient, file_ser
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_file_browser_download_file_not_found(logged_in_client: TestClient): async def test_file_browser_download_file_not_found(logged_in_client: TestClient):
resp = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False) response = await logged_in_client.get("/files/download/nonexistent_web.txt", allow_redirects=False)
assert resp.status == 404 assert response.status == 404
assert "File not found" in await response.text()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir): async def test_file_browser_delete_file(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
@ -422,61 +294,272 @@ async def test_file_browser_delete_folder(logged_in_client: TestClient, file_ser
assert not expected_path.is_dir() assert not expected_path.is_dir()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_file_browser_share_file(logged_in_client: TestClient, file_service_instance): async def test_file_browser_delete_multiple_files(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com" user_email = "test@example.com"
file_name = "web_share.txt" file_names = ["multi_delete_1.txt", "multi_delete_2.txt", "multi_delete_3.txt"]
await file_service_instance.upload_file(user_email, file_name, b"shareable content") for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
paths_to_delete = [f"{name}" for name in file_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
for name in file_names:
expected_path = temp_user_files_dir / user_email / name
assert not expected_path.is_file()
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_folders(logged_in_client: TestClient, file_service_instance, temp_user_files_dir):
user_email = "test@example.com"
folder_names = ["multi_delete_folder_1", "multi_delete_folder_2"]
for name in folder_names:
await file_service_instance.create_folder(user_email, name)
await file_service_instance.upload_file(user_email, f"{name}/nested.txt", b"nested content")
paths_to_delete = [f"{name}" for name in folder_names]
# Construct FormData for multiple paths
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert resp.headers["Location"].startswith("/files")
for name in folder_names:
expected_path = temp_user_files_dir / user_email / name
assert not expected_path.is_dir()
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/delete_multiple", data={}, allow_redirects=False)
assert resp.status == 302
assert "error=No+items+selected+for+deletion" in resp.headers["Location"]
@pytest.mark.asyncio
async def test_file_browser_delete_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, temp_user_files_dir, mocker):
user_email = "test@example.com"
file_names = ["fail_delete_1.txt", "fail_delete_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content")
paths_to_delete = [f"{name}" for name in file_names]
# Mock delete_item to fail for the first item
original_delete_item = file_service_instance.delete_item
async def mock_delete_item(email, path):
if path == file_names[0]:
return False # Simulate failure for the first item
return await original_delete_item(email, path)
mocker.patch.object(file_service_instance, "delete_item", side_effect=mock_delete_item)
data = aiohttp.FormData()
for path in paths_to_delete:
data.add_field('paths[]', path)
resp = await logged_in_client.post("/files/delete_multiple", data=data, allow_redirects=False)
assert resp.status == 302 # Redirect
assert "error=Some+items+failed+to+delete" in resp.headers["Location"]
# Check if the first file still exists (failed to delete)
assert (temp_user_files_dir / user_email / file_names[0]).is_file()
# Check if the second file is deleted (succeeded)
assert not (temp_user_files_dir / user_email / file_names[1]).is_file()
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_no_paths(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share_multiple", json={"paths": []})
assert resp.status == 400
data = await resp.json()
assert data["error"] == "No items selected for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_missing_path(logged_in_client: TestClient):
resp = await logged_in_client.post("/files/share/", json={}) # No file_path in URL
assert resp.status == 400
data = await resp.json()
assert data["error"] == "File path is required for sharing"
@pytest.mark.asyncio
async def test_file_browser_share_file_fail_generate_link(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "fail_share_link.txt"
await file_service_instance.upload_file(user_email, file_name, b"content")
mocker.patch.object(file_service_instance, "generate_share_link", return_value=None)
resp = await logged_in_client.post(f"/files/share/{file_name}") resp = await logged_in_client.post(f"/files/share/{file_name}")
assert resp.status == 200 assert resp.status == 500
data = await resp.json() data = await resp.json()
assert "share_link" in data assert data["error"] == "Failed to generate share link"
assert "http" in data["share_link"] # Check if it's a URL
# Verify the shared item can be retrieved @pytest.mark.asyncio
# The share_link will be something like http://localhost:PORT/shared_file/SHARE_ID async def test_file_browser_delete_item_missing_path(logged_in_client: TestClient):
share_id = data["share_link"].split("/")[-1] resp = await logged_in_client.post("/files/delete/", allow_redirects=False) # No file_path in URL
shared_item = await file_service_instance.get_shared_item(share_id) assert resp.status == 302
assert shared_item is not None assert "error=Item+path+is+required+for+deletion" in resp.headers["Location"]
assert shared_item["item_path"] == file_name
@pytest.fixture @pytest.mark.asyncio
def create_test_app(mocker, temp_user_files_dir, temp_users_json, file_service_instance): async def test_file_browser_delete_item_fail(logged_in_client: TestClient, file_service_instance, mocker):
"""Fixture to create a test aiohttp application with mocked services.""" user_email = "test@example.com"
from aiohttp import web file_name = "fail_delete.txt"
from retoors.middlewares import user_middleware, error_middleware await file_service_instance.upload_file(user_email, file_name, b"content")
from retoors.services.user_service import UserService
from retoors.routes import setup_routes
import aiohttp_jinja2
import jinja2
app = web.Application() mocker.patch.object(file_service_instance, "delete_item", return_value=False)
# Setup session for the test app resp = await logged_in_client.post(f"/files/delete/{file_name}", allow_redirects=False)
project_root = Path(__file__).parent.parent assert resp.status == 302
env_file_path = project_root / ".env" assert "error=Failed+to+delete+item+-+it+may+not+exist" in resp.headers["Location"]
secret_key = get_or_create_session_secret_key(env_file_path)
setup_session(app, EncryptedCookieStorage(secret_key.decode("utf-8")))
app.middlewares.append(error_middleware) @pytest.mark.asyncio
app.middlewares.append(user_middleware) async def test_file_browser_download_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
folder_name = "shared_folder"
file_name = "nested.txt"
share_id = "test_share_id"
# Mock UserService mocker.patch.object(file_service_instance, "get_shared_item", return_value={
mock_user_service = mocker.MagicMock(spec=UserService) "user_email": user_email,
"item_path": folder_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: True))
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
# Mock scheduler resp = await client.get(f"/shared_file/{share_id}/download?file_path={file_name}")
mock_scheduler = mocker.MagicMock() assert resp.status == 404
mock_scheduler.spawn = mocker.AsyncMock() text = await resp.text()
mock_scheduler.close = mocker.AsyncMock() assert "Shared file not found or inaccessible within the shared folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_not_a_directory(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
app["user_service"] = mock_user_service mocker.patch.object(file_service_instance, "get_shared_item", return_value={
app["file_service"] = file_service_instance "user_email": user_email,
app["scheduler"] = mock_scheduler "item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch.object(file_service_instance, "_get_user_file_path", return_value=mocker.MagicMock(is_dir=lambda: False))
resp = await client.get(f"/shared_file/{share_id}/download?file_path=some_file.txt")
assert resp.status == 400
text = await resp.text()
assert "Cannot download specific files from a shared item that is not a folder." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_shared_item_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id/download?file_path=some_file.txt")
assert resp.status == 404
text = await resp.text()
assert "Shared link is invalid or has expired." in text
@pytest.mark.asyncio
async def test_file_browser_download_shared_file_handler_missing_file_path(client: TestClient):
resp = await client.get("/shared_file/some_share_id/download")
assert resp.status == 400
assert "File path is required for download from shared folder." in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_fail_get_content(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_name = "shared_file.txt"
share_id = "test_share_id"
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": file_name,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
mocker.patch("pathlib.Path.is_file", return_value=True) # Simulate it's a file
mocker.patch.object(file_service_instance, "get_shared_file_content", return_value=None)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared file not found or inaccessible" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_neither_file_nor_dir(client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
item_path = "mystery_item"
share_id = "test_share_id"
mocker.patch.object(file_service_instance, "get_shared_item", return_value={
"user_email": user_email,
"item_path": item_path,
"share_id": share_id,
"created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"expires_at": (datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=7)).isoformat()
})
# Mock Path.is_file and Path.is_dir to return False
mocker.patch("pathlib.Path.is_file", return_value=False)
mocker.patch("pathlib.Path.is_dir", return_value=False)
resp = await client.get(f"/shared_file/{share_id}")
assert resp.status == 404
text = await resp.text()
assert "Shared item not found" in text
@pytest.mark.asyncio
async def test_file_browser_shared_file_handler_not_found(client: TestClient, file_service_instance, mocker):
mocker.patch.object(file_service_instance, "get_shared_item", return_value=None)
resp = await client.get("/shared_file/nonexistent_share_id")
assert resp.status == 404
text = await resp.text()
assert "Shared link is invalid or has expired." in text
@pytest.mark.asyncio
async def test_file_browser_unknown_post_action(logged_in_client: TestClient, mocker):
# Mock the route name to simulate an unknown action
mock_route = mocker.MagicMock(name="unknown_action")
mock_route.current_app = logged_in_client.app # Provide a mock current_app
mocker.patch("aiohttp.web_request.Request.match_info", new_callable=mocker.PropertyMock, return_value={"route": mock_route})
resp = await logged_in_client.post("/files/some_unknown_action", allow_redirects=False)
assert resp.status == 400
assert "Unknown file action" in await resp.text()
@pytest.mark.asyncio
async def test_file_browser_share_multiple_items_some_fail(logged_in_client: TestClient, file_service_instance, mocker):
user_email = "test@example.com"
file_names = ["fail_share_1.txt", "fail_share_2.txt"]
for name in file_names:
await file_service_instance.upload_file(user_email, name, b"content to share")
paths_to_share = [f"{name}" for name in file_names]
# Mock generate_share_link to fail for the first item
original_generate_share_link = file_service_instance.generate_share_link
async def mock_generate_share_link(email, path):
if path == file_names[0]:
return None # Simulate failure for the first item
return await original_generate_share_link(email, path)
mocker.patch.object(file_service_instance, "generate_share_link", side_effect=mock_generate_share_link)
resp = await logged_in_client.post("/files/share_multiple", json={"paths": paths_to_share})
assert resp.status == 200 # Expect 200 even if some fail, as long as at least one succeeds
data = await resp.json()
assert "share_links" in data
assert len(data["share_links"]) == 1 # Only one link should be generated
assert data["share_links"][0]["name"] == file_names[1] # The successful one
# Setup Jinja2 for templates
base_path = Path(__file__).parent.parent / "retoors"
templates_path = base_path / "templates"
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(str(templates_path)))
setup_routes(app)
return app

View File

@ -133,34 +133,44 @@ async def test_privacy_get(client):
assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text assert "This Privacy Policy describes how Retoor's Cloud Solutions collects, uses, and discloses your information" in text
async def test_shared_get_unauthorized(client): async def test_shared_get_authorized(logged_in_client):
resp = await client.get("/shared", allow_redirects=False) resp = await logged_in_client.get("/shared")
assert resp.status == 302 assert resp.status == 200
assert resp.headers["Location"] == "/login" text = await resp.text()
assert "Shared with me" in text
assert "Files and folders that have been shared with you will appear here." in text
async def test_recent_get_unauthorized(client): async def test_recent_get_authorized(logged_in_client):
resp = await client.get("/recent", allow_redirects=False) resp = await logged_in_client.get("/recent")
assert resp.status == 302 assert resp.status == 200
assert resp.headers["Location"] == "/login" text = await resp.text()
assert "Recent Files" in text
assert "Your recently accessed files will appear here." in text
async def test_favorites_get_unauthorized(client): async def test_favorites_get_authorized(logged_in_client):
resp = await client.get("/favorites", allow_redirects=False) resp = await logged_in_client.get("/favorites")
assert resp.status == 302 assert resp.status == 200
assert resp.headers["Location"] == "/login" text = await resp.text()
assert "Favorites" in text
assert "Your favorite files and folders will appear here." in text
async def test_trash_get_unauthorized(client): async def test_trash_get_authorized(logged_in_client):
resp = await client.get("/trash", allow_redirects=False) resp = await logged_in_client.get("/trash")
assert resp.status == 302 assert resp.status == 200
assert resp.headers["Location"] == "/login" text = await resp.text()
assert "Trash" in text
assert "Files and folders you have deleted will appear here." in text
async def test_file_browser_get_unauthorized(client): async def test_users_get_authorized(logged_in_client):
resp = await client.get("/files", allow_redirects=False) resp = await logged_in_client.get("/users")
assert resp.status == 302 assert resp.status == 200
assert resp.headers["Location"] == "/login" text = await resp.text()
assert "User Management" in text
assert "+ Add New User" in text
async def test_file_browser_get_authorized(client): async def test_file_browser_get_authorized(client):