import aiofiles
import json
from pathlib import Path
import shutil
import uuid
import datetime
import logging
import hashlib
import os
from .storage_service import StorageService
from .sharing_service import SharingService
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class FileService:
def __init__(self, base_dir: Path, user_service):
self.base_dir = base_dir
self.user_service = user_service
self.storage = StorageService()
self.sharing_service = SharingService()
self.base_dir.mkdir(parents=True, exist_ok=True)
self.drives_dir = self.base_dir / "drives"
self.drives_dir.mkdir(exist_ok=True)
self.drives = ["drive1", "drive2", "drive3"]
for drive in self.drives:
(self.drives_dir / drive).mkdir(exist_ok=True)
self.drive_counter = 0
logger.info(f"FileService initialized with base_dir: {self.base_dir} and user_service")
def _choose_drive(self):
drive = self.drives[self.drive_counter % len(self.drives)]
self.drive_counter += 1
return drive
async def _load_metadata(self, user_email: str):
metadata = await self.storage.load(user_email, "file_metadata")
return metadata if metadata else {}
async def _save_metadata(self, user_email: str, metadata: dict):
await self.storage.save(user_email, "file_metadata", metadata)
def get_user_file_system_path(self, user_email: str, item_path: str) -> Path:
"""
Constructs the absolute file system path for a user's item.
This is for internal use to resolve virtual paths to physical paths.
"""
user_base_path = self.storage._get_user_base_path(user_email)
# Ensure item_path is treated as relative to user_base_path
# and prevent directory traversal attacks.
full_path = user_base_path / item_path
if not self.storage._validate_path(full_path, user_base_path):
raise ValueError("Invalid item path: directory traversal detected")
return full_path
async def list_files(self, user_email: str, path: str = "") -> list:
"""Lists files and directories for a given user within a specified path."""
metadata = await self._load_metadata(user_email)
# Normalize path
if path and not path.endswith('/'):
path += '/'
# Update last_accessed for the folder if path is not root
if path:
folder_path = path.rstrip('/')
if folder_path in metadata and metadata[folder_path].get("type") == "dir":
metadata[folder_path]["last_accessed"] = datetime.datetime.now().isoformat()
await self._save_metadata(user_email, metadata)
items = []
seen = set()
for item_path, item_meta in metadata.items():
if item_path.startswith(path):
remaining = item_path[len(path):].rstrip('/')
if '/' not in remaining and remaining not in seen:
seen.add(remaining)
if remaining: # not the path itself
items.append({
"name": remaining,
"is_dir": item_meta.get("type") == "dir",
"path": item_path,
"size": item_meta.get("size", 0),
"last_modified": item_meta.get("modified_at", ""),
})
logger.debug(f"Listed {len(items)} items for user '{user_email}' in path '{path}'")
return sorted(items, key=lambda x: (not x["is_dir"], x["name"].lower()))
async def create_folder(self, user_email: str, folder_path: str) -> bool:
"""Creates a new folder for the user."""
metadata = await self._load_metadata(user_email)
if folder_path in metadata:
logger.warning(f"create_folder: Folder already exists: {folder_path}")
return False
metadata[folder_path] = {
"type": "dir",
"created_at": datetime.datetime.now().isoformat(),
"modified_at": datetime.datetime.now().isoformat(),
"last_accessed": datetime.datetime.now().isoformat(),
}
await self._save_metadata(user_email, metadata)
logger.info(f"create_folder: Folder created: {folder_path}")
return True
async def upload_file(self, user_email: str, file_path: str, content: bytes) -> bool:
"""Uploads a file for the user."""
hash = hashlib.sha256(content).hexdigest()
drive = self._choose_drive()
dir1 = hash[:3]
dir2 = hash[3:6]
dir3 = hash[6:9]
blob_path = self.drives_dir / drive / dir1 / dir2 / dir3 / hash
blob_path.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(blob_path, 'wb') as f:
await f.write(content)
metadata = await self._load_metadata(user_email)
metadata[file_path] = {
"type": "file",
"size": len(content),
"hash": hash,
"blob_location": {"drive": drive, "path": f"{dir1}/{dir2}/{dir3}/{hash}"},
"created_at": datetime.datetime.now().isoformat(),
"modified_at": datetime.datetime.now().isoformat(),
"last_accessed": datetime.datetime.now().isoformat(),
}
await self._save_metadata(user_email, metadata)
logger.info(f"upload_file: File uploaded to drive {drive}: {file_path}")
return True
async def download_file(self, user_email: str, file_path: str) -> tuple[bytes, str] | None:
"""Downloads a file for the user."""
metadata = await self._load_metadata(user_email)
if file_path not in metadata or metadata[file_path]["type"] != "file":
logger.warning(f"download_file: File not found in metadata: {file_path}")
return None
item_meta = metadata[file_path]
# Update last_accessed
item_meta["last_accessed"] = datetime.datetime.now().isoformat()
await self._save_metadata(user_email, metadata)
blob_loc = item_meta["blob_location"]
blob_path = self.drives_dir / blob_loc["drive"] / blob_loc["path"]
if not blob_path.exists():
logger.warning(f"download_file: Blob not found: {blob_path}")
return None
async with aiofiles.open(blob_path, 'rb') as f:
content = await f.read()
logger.info(f"download_file: Successfully read file: {file_path}")
return content, Path(file_path).name
async def read_file_content_binary(self, user_email: str, file_path: str) -> str | None:
"""Reads file content as text for editing."""
metadata = await self._load_metadata(user_email)
if file_path not in metadata or metadata[file_path]["type"] != "file":
logger.warning(f"read_file_content: File not found in metadata: {file_path}")
return None
item_meta = metadata[file_path]
# Update last_accessed
item_meta["last_accessed"] = datetime.datetime.now().isoformat()
await self._save_metadata(user_email, metadata)
blob_loc = item_meta["blob_location"]
blob_path = self.drives_dir / blob_loc["drive"] / blob_loc["path"]
if not blob_path.exists():
logger.warning(f"read_file_content: Blob not found: {blob_path}")
return None
try:
async with aiofiles.open(blob_path, 'rb') as f:
content = await f.read()
logger.info(f"read_file_content: Successfully read file: {file_path}")
return content
except UnicodeDecodeError:
logger.warning(f"read_file_content: File is not a text file: {file_path}")
return None
async def read_file_content(self, user_email: str, file_path: str) -> str | None:
"""Reads file content as text for editing."""
metadata = await self._load_metadata(user_email)
if file_path not in metadata or metadata[file_path]["type"] != "file":
logger.warning(f"read_file_content: File not found in metadata: {file_path}")
return None
item_meta = metadata[file_path]
# Update last_accessed
item_meta["last_accessed"] = datetime.datetime.now().isoformat()
await self._save_metadata(user_email, metadata)
blob_loc = item_meta["blob_location"]
blob_path = self.drives_dir / blob_loc["drive"] / blob_loc["path"]
if not blob_path.exists():
logger.warning(f"read_file_content: Blob not found: {blob_path}")
return None
try:
async with aiofiles.open(blob_path, 'r', encoding='utf-8') as f:
content = await f.read()
logger.info(f"read_file_content: Successfully read file: {file_path}")
return content
except UnicodeDecodeError:
logger.warning(f"read_file_content: File is not a text file: {file_path}")
return None
async def save_file_content(self, user_email: str, file_path: str, content: str) -> bool:
"""Saves file content from editor."""
try:
content_bytes = content.encode('utf-8')
success = await self.upload_file(user_email, file_path, content_bytes)
if success:
logger.info(f"save_file_content: Successfully saved file: {file_path}")
return success
except Exception as e:
logger.error(f"save_file_content: Error saving file {file_path}: {e}")
return False
async def delete_item(self, user_email: str, item_path: str) -> bool:
"""Deletes a file or folder for the user."""
metadata = await self._load_metadata(user_email)
if item_path not in metadata:
logger.warning(f"delete_item: Item not found: {item_path}")
return False
# If dir, remove all under it
to_delete = [p for p in metadata if p == item_path or p.startswith(item_path + '/')]
for p in to_delete:
del metadata[p]
await self._save_metadata(user_email, metadata)
logger.info(f"delete_item: Item deleted: {item_path}")
return True
async def generate_share_link(
self,
user_email: str,
item_path: str,
permission: str = "view",
scope: str = "public",
password: str = None,
expiration_days: int = None,
disable_download: bool = False,
recipient_emails: list = None
) -> str | None:
logger.debug(f"generate_share_link: Generating link for user '{user_email}', item '{item_path}'")
metadata = await self._load_metadata(user_email)
if item_path not in metadata:
logger.warning(f"generate_share_link: Item does not exist: {item_path}")
return None
share_id = await self.sharing_service.create_share(
owner_email=user_email,
item_path=item_path,
permission=permission,
scope=scope,
password=password,
expiration_days=expiration_days,
disable_download=disable_download,
recipient_emails=recipient_emails
)
logger.info(f"generate_share_link: Share link generated with ID: {share_id} for item: {item_path}")
return share_id
async def get_shared_item(self, share_id: str, password: str = None, accessor_email: str = None) -> dict | None:
logger.debug(f"get_shared_item: Retrieving shared item with ID: {share_id}")
share = await self.sharing_service.get_share(share_id)
if not share:
logger.warning(f"get_shared_item: No valid shared item found for ID: {share_id}")
return None
if not await self.sharing_service.verify_share_access(share_id, password, accessor_email):
logger.warning(f"get_shared_item: Access denied for share {share_id}")
return None
await self.sharing_service.record_share_access(share_id, accessor_email)
logger.info(f"get_shared_item: Found valid shared item for ID: {share_id}")
return share
async def get_shared_file_content(self, share_id: str, password: str = None, accessor_email: str = None, requested_file_path: str = None) -> tuple[bytes, str] | None:
logger.debug(f"get_shared_file_content: Retrieving content for shared file with ID: {share_id}, requested_file_path: {requested_file_path}")
shared_item = await self.get_shared_item(share_id, password, accessor_email)
if not shared_item:
return None
if shared_item.get("disable_download", False):
logger.warning(f"get_shared_file_content: Download disabled for share {share_id}")
return None
user_email = shared_item["owner_email"]
item_path = shared_item["item_path"]
target_path = item_path
if requested_file_path:
target_path = requested_file_path
if not target_path.startswith(item_path + '/') and target_path != item_path:
logger.warning(f"get_shared_file_content: Requested file path '{requested_file_path}' is not within shared item path '{item_path}' for share_id: {share_id}")
return None
return await self.download_file(user_email, target_path)
async def get_shared_folder_content(self, share_id: str, password: str = None, accessor_email: str = None) -> list | None:
logger.debug(f"get_shared_folder_content: Retrieving content for shared folder with ID: {share_id}")
shared_item = await self.get_shared_item(share_id, password, accessor_email)
if not shared_item:
return None
user_email = shared_item["owner_email"]
item_path = shared_item["item_path"]
metadata = await self._load_metadata(user_email)
if item_path not in metadata or metadata[item_path]["type"] != "dir":
logger.warning(f"get_shared_folder_content: Shared item is not a directory: {item_path}")
return None
logger.info(f"get_shared_folder_content: Listing files for shared folder: {item_path}")
return await self.list_files(user_email, item_path)
async def migrate_old_files(self, user_email: str):
"""Migrate existing files from old file system to virtual system."""
old_user_dir = self.base_dir / user_email
if not old_user_dir.exists():
logger.info(f"No old files to migrate for {user_email}")
return
metadata = await self._load_metadata(user_email)
migrated_count = 0
for root, dirs, files in os.walk(old_user_dir):
rel_root = Path(root).relative_to(old_user_dir)
for dir_name in dirs:
dir_path = str(rel_root / dir_name) if str(rel_root) != "." else dir_name
if dir_path not in metadata:
metadata[dir_path] = {
"type": "dir",
"created_at": datetime.datetime.now().isoformat(),
"modified_at": datetime.datetime.now().isoformat(),
"last_accessed": datetime.datetime.now().isoformat(),
}
migrated_count += 1
for file_name in files:
file_path = rel_root / file_name
str_file_path = str(file_path) if str(rel_root) != "." else file_name
if str_file_path in metadata:
continue # Already migrated
full_file_path = old_user_dir / file_path
with open(full_file_path, 'rb') as f:
content = f.read()
hash = hashlib.sha256(content).hexdigest()
drive = self._choose_drive()
dir1 = hash[:3]
dir2 = hash[3:6]
dir3 = hash[6:9]
blob_path = self.drives_dir / drive / dir1 / dir2 / dir3 / hash
if not blob_path.exists():
blob_path.parent.mkdir(parents=True, exist_ok=True)
with open(blob_path, 'wb') as f:
f.write(content)
metadata[str_file_path] = {
"type": "file",
"size": len(content),
"hash": hash,
"blob_location": {"drive": drive, "path": f"{dir1}/{dir2}/{dir3}/{hash}"},
"created_at": datetime.datetime.fromtimestamp(full_file_path.stat().st_ctime).isoformat(),
"modified_at": datetime.datetime.fromtimestamp(full_file_path.stat().st_mtime).isoformat(),
"last_accessed": datetime.datetime.fromtimestamp(full_file_path.stat().st_atime).isoformat(),
}
migrated_count += 1
await self._save_metadata(user_email, metadata)
logger.info(f"Migrated {migrated_count} items for {user_email}")
# Optionally remove old dir
# shutil.rmtree(old_user_dir)
async def get_recent_files(self, user_email: str, limit: int = 50) -> list:
"""Gets the most recently accessed files and folders for the user."""
metadata = await self._load_metadata(user_email)
items = []
for path, meta in metadata.items():
if meta.get("type") in ("file", "dir"):
last_accessed = meta.get("last_accessed", meta.get("modified_at", ""))
items.append({
"path": path,
"name": Path(path).name,
"is_dir": meta["type"] == "dir",
"size": meta.get("size", 0) if meta["type"] == "file" else 0,
"last_accessed": last_accessed,
})
# Sort by last_accessed descending
items.sort(key=lambda x: x["last_accessed"], reverse=True)
return items[:limit]