from fastapi import (
APIRouter,
Depends,
UploadFile,
File as FastAPIFile,
HTTPException,
status,
Form,
)
from fastapi.responses import StreamingResponse
from typing import List, Optional
import mimetypes
import hashlib
import os
from datetime import datetime
from pydantic import BaseModel
from ..auth import get_current_user
from ..models import User, File, Folder
from ..schemas import FileOut
from ..storage import storage_manager
from ..activity import log_activity
from ..thumbnails import generate_thumbnail, delete_thumbnail
router = APIRouter(
prefix="/files",
tags=["files"],
)
class FileMove(BaseModel):
target_folder_id: Optional[int] = None
class FileRename(BaseModel):
new_name: str
class FileCopy(BaseModel):
target_folder_id: Optional[int] = None
class BatchFileOperation(BaseModel):
file_ids: List[int]
operation: str # e.g., "delete", "star", "unstar", "move", "copy"
class BatchMoveCopyPayload(BaseModel):
target_folder_id: Optional[int] = None
class FileContentUpdate(BaseModel):
content: str
@router.post("/upload", response_model=FileOut, status_code=status.HTTP_201_CREATED)
async def upload_file(
file: UploadFile = FastAPIFile(...),
folder_id: Optional[int] = Form(None),
current_user: User = Depends(get_current_user),
):
if folder_id:
parent_folder = await Folder.get_or_none(
id=folder_id, owner=current_user, is_deleted=False
)
if not parent_folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Folder not found"
)
else:
parent_folder = None
existing_file = await File.get_or_none(
name=file.filename, parent=parent_folder, owner=current_user, is_deleted=False
)
if existing_file:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File with this name already exists in the current folder",
)
file_content = await file.read()
file_size = len(file_content)
file_hash = hashlib.sha256(file_content).hexdigest()
if current_user.used_storage_bytes + file_size > current_user.storage_quota_bytes:
raise HTTPException(
status_code=status.HTTP_507_INSUFFICIENT_STORAGE,
detail="Storage quota exceeded",
)
# Generate a unique path for storage
file_extension = os.path.splitext(file.filename)[1]
unique_filename = f"{file_hash}{file_extension}" # Use hash for unique filename
storage_path = unique_filename
# Save file to storage
await storage_manager.save_file(current_user.id, storage_path, file_content)
# Get mime type
mime_type, _ = mimetypes.guess_type(file.filename)
if not mime_type:
mime_type = "application/octet-stream"
# Create file entry in database
db_file = await File.create(
name=file.filename,
path=storage_path,
size=file_size,
mime_type=mime_type,
file_hash=file_hash,
owner=current_user,
parent=parent_folder,
)
current_user.used_storage_bytes += file_size
await current_user.save()
thumbnail_path = await generate_thumbnail(storage_path, mime_type, current_user.id)
if thumbnail_path:
db_file.thumbnail_path = thumbnail_path
await db_file.save()
return await FileOut.from_tortoise_orm(db_file)
@router.get("/download/{file_id}")
async def download_file(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
db_file.last_accessed_at = datetime.now()
await db_file.save()
try:
async def file_iterator():
async for chunk in storage_manager.get_file(current_user.id, db_file.path):
yield chunk
return StreamingResponse(
file_iterator(),
media_type=db_file.mime_type,
headers={"Content-Disposition": f'attachment; filename="{db_file.name}"'},
)
except FileNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found in storage"
)
@router.delete("/{file_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_file(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
db_file.is_deleted = True
db_file.deleted_at = datetime.now()
await db_file.save()
await delete_thumbnail(db_file.id)
return
@router.post("/{file_id}/move", response_model=FileOut)
async def move_file(
file_id: int, move_data: FileMove, current_user: User = Depends(get_current_user)
):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
target_folder = None
if move_data.target_folder_id:
target_folder = await Folder.get_or_none(
id=move_data.target_folder_id, owner=current_user, is_deleted=False
)
if not target_folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Target folder not found"
)
existing_file = await File.get_or_none(
name=db_file.name, parent=target_folder, owner=current_user, is_deleted=False
)
if existing_file and existing_file.id != file_id:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File with this name already exists in target folder",
)
db_file.parent = target_folder
await db_file.save()
await log_activity(
user=current_user, action="file_moved", target_type="file", target_id=file_id
)
return await FileOut.from_tortoise_orm(db_file)
@router.post("/{file_id}/rename", response_model=FileOut)
async def rename_file(
file_id: int,
rename_data: FileRename,
current_user: User = Depends(get_current_user),
):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
existing_file = await File.get_or_none(
name=rename_data.new_name,
parent_id=db_file.parent_id,
owner=current_user,
is_deleted=False,
)
if existing_file and existing_file.id != file_id:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="File with this name already exists in the same folder",
)
db_file.name = rename_data.new_name
await db_file.save()
await log_activity(
user=current_user, action="file_renamed", target_type="file", target_id=file_id
)
return await FileOut.from_tortoise_orm(db_file)
@router.post(
"/{file_id}/copy", response_model=FileOut, status_code=status.HTTP_201_CREATED
)
async def copy_file(
file_id: int, copy_data: FileCopy, current_user: User = Depends(get_current_user)
):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
target_folder = None
if copy_data.target_folder_id:
target_folder = await Folder.get_or_none(
id=copy_data.target_folder_id, owner=current_user, is_deleted=False
)
if not target_folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Target folder not found"
)
base_name = db_file.name
name_parts = os.path.splitext(base_name)
counter = 1
new_name = base_name
while await File.get_or_none(
name=new_name, parent=target_folder, owner=current_user, is_deleted=False
):
new_name = f"{name_parts[0]} (copy {counter}){name_parts[1]}"
counter += 1
new_file = await File.create(
name=new_name,
path=db_file.path,
size=db_file.size,
mime_type=db_file.mime_type,
file_hash=db_file.file_hash,
owner=current_user,
parent=target_folder,
)
await log_activity(
user=current_user,
action="file_copied",
target_type="file",
target_id=new_file.id,
)
return await FileOut.from_tortoise_orm(new_file)
@router.get("/", response_model=List[FileOut])
async def list_files(
folder_id: Optional[int] = None, current_user: User = Depends(get_current_user)
):
if folder_id:
parent_folder = await Folder.get_or_none(
id=folder_id, owner=current_user, is_deleted=False
)
if not parent_folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Folder not found"
)
files = await File.filter(
parent=parent_folder, owner=current_user, is_deleted=False
).order_by("name")
else:
files = await File.filter(
parent=None, owner=current_user, is_deleted=False
).order_by("name")
return [await FileOut.from_tortoise_orm(f) for f in files]
@router.get("/thumbnail/{file_id}")
async def get_thumbnail(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
db_file.last_accessed_at = datetime.now()
await db_file.save()
thumbnail_path = getattr(db_file, "thumbnail_path", None)
if not thumbnail_path:
thumbnail_path = await generate_thumbnail(
db_file.path, db_file.mime_type, current_user.id
)
if thumbnail_path:
db_file.thumbnail_path = thumbnail_path
await db_file.save()
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Thumbnail not available"
)
try:
async def thumbnail_iterator():
async for chunk in storage_manager.get_file(
current_user.id, thumbnail_path
):
yield chunk
return StreamingResponse(thumbnail_iterator(), media_type="image/jpeg")
except FileNotFoundError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Thumbnail not found in storage",
)
@router.get("/photos", response_model=List[FileOut])
async def list_photos(current_user: User = Depends(get_current_user)):
files = await File.filter(
owner=current_user, is_deleted=False, mime_type__istartswith="image/"
).order_by("-created_at")
return [await FileOut.from_tortoise_orm(f) for f in files]
@router.get("/recent", response_model=List[FileOut])
async def list_recent_files(
current_user: User = Depends(get_current_user), limit: int = 10
):
files = (
await File.filter(
owner=current_user, is_deleted=False, last_accessed_at__isnull=False
)
.order_by("-last_accessed_at")
.limit(limit)
)
return [await FileOut.from_tortoise_orm(f) for f in files]
@router.post("/{file_id}/star", response_model=FileOut)
async def star_file(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
db_file.is_starred = True
await db_file.save()
await log_activity(
user=current_user, action="file_starred", target_type="file", target_id=file_id
)
return await FileOut.from_tortoise_orm(db_file)
@router.post("/{file_id}/unstar", response_model=FileOut)
async def unstar_file(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
db_file.is_starred = False
await db_file.save()
await log_activity(
user=current_user,
action="file_unstarred",
target_type="file",
target_id=file_id,
)
return await FileOut.from_tortoise_orm(db_file)
@router.get("/deleted", response_model=List[FileOut])
async def list_deleted_files(current_user: User = Depends(get_current_user)):
files = await File.filter(owner=current_user, is_deleted=True).order_by(
"-deleted_at"
)
return [await FileOut.from_tortoise_orm(f) for f in files]
@router.post("/{file_id}/restore", response_model=FileOut)
async def restore_file(file_id: int, current_user: User = Depends(get_current_user)):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=True)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Deleted file not found"
)
# Check if a file with the same name exists in the parent folder
existing_file = await File.get_or_none(
name=db_file.name, parent=db_file.parent, owner=current_user, is_deleted=False
)
if existing_file:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="A file with the same name already exists in this location. Please rename the existing file or restore to a different location.",
)
db_file.is_deleted = False
db_file.deleted_at = None
await db_file.save()
await log_activity(
user=current_user, action="file_restored", target_type="file", target_id=file_id
)
return await FileOut.from_tortoise_orm(db_file)
class BatchOperationResult(BaseModel):
succeeded: List[FileOut]
failed: List[dict]
@router.post("/batch")
async def batch_file_operations(
batch_operation: BatchFileOperation,
payload: Optional[BatchMoveCopyPayload] = None,
current_user: User = Depends(get_current_user),
):
if batch_operation.operation not in ["delete", "star", "unstar", "move", "copy"]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid operation: {batch_operation.operation}",
)
updated_files = []
failed_operations = []
for file_id in batch_operation.file_ids:
try:
db_file = await File.get_or_none(
id=file_id, owner=current_user, is_deleted=False
)
if not db_file:
failed_operations.append(
{
"file_id": file_id,
"reason": "File not found or not owned by user",
}
)
continue
if batch_operation.operation == "delete":
db_file.is_deleted = True
db_file.deleted_at = datetime.now()
await db_file.save()
await delete_thumbnail(db_file.id)
await log_activity(
user=current_user,
action="file_deleted_batch",
target_type="file",
target_id=file_id,
)
updated_files.append(db_file)
elif batch_operation.operation == "star":
db_file.is_starred = True
await db_file.save()
await log_activity(
user=current_user,
action="file_starred_batch",
target_type="file",
target_id=file_id,
)
updated_files.append(db_file)
elif batch_operation.operation == "unstar":
db_file.is_starred = False
await db_file.save()
await log_activity(
user=current_user,
action="file_unstarred_batch",
target_type="file",
target_id=file_id,
)
updated_files.append(db_file)
elif batch_operation.operation == "move":
if not payload or payload.target_folder_id is None:
failed_operations.append(
{"file_id": file_id, "reason": "Target folder not specified"}
)
continue
target_folder = await Folder.get_or_none(
id=payload.target_folder_id, owner=current_user, is_deleted=False
)
if not target_folder:
failed_operations.append(
{"file_id": file_id, "reason": "Target folder not found"}
)
continue
existing_file = await File.get_or_none(
name=db_file.name,
parent=target_folder,
owner=current_user,
is_deleted=False,
)
if existing_file and existing_file.id != file_id:
failed_operations.append(
{
"file_id": file_id,
"reason": "File with same name exists in target folder",
}
)
continue
db_file.parent = target_folder
await db_file.save()
await log_activity(
user=current_user,
action="file_moved_batch",
target_type="file",
target_id=file_id,
)
updated_files.append(db_file)
elif batch_operation.operation == "copy":
if not payload or payload.target_folder_id is None:
failed_operations.append(
{"file_id": file_id, "reason": "Target folder not specified"}
)
continue
target_folder = await Folder.get_or_none(
id=payload.target_folder_id, owner=current_user, is_deleted=False
)
if not target_folder:
failed_operations.append(
{"file_id": file_id, "reason": "Target folder not found"}
)
continue
base_name = db_file.name
name_parts = os.path.splitext(base_name)
counter = 1
new_name = base_name
while await File.get_or_none(
name=new_name,
parent=target_folder,
owner=current_user,
is_deleted=False,
):
new_name = f"{name_parts[0]} (copy {counter}){name_parts[1]}"
counter += 1
new_file = await File.create(
name=new_name,
path=db_file.path,
size=db_file.size,
mime_type=db_file.mime_type,
file_hash=db_file.file_hash,
owner=current_user,
parent=target_folder,
)
await log_activity(
user=current_user,
action="file_copied_batch",
target_type="file",
target_id=new_file.id,
)
updated_files.append(new_file)
except Exception as e:
failed_operations.append({"file_id": file_id, "reason": str(e)})
return {
"succeeded": [await FileOut.from_tortoise_orm(f) for f in updated_files],
"failed": failed_operations,
}
@router.put("/{file_id}/content", response_model=FileOut)
async def update_file_content(
file_id: int,
payload: FileContentUpdate,
current_user: User = Depends(get_current_user),
):
db_file = await File.get_or_none(id=file_id, owner=current_user, is_deleted=False)
if not db_file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="File not found"
)
if not db_file.mime_type or not db_file.mime_type.startswith("text/"):
editableExtensions = [
"txt",
"md",
"log",
"json",
"js",
"py",
"html",
"css",
"xml",
"yaml",
"yml",
"sh",
"bat",
"ini",
"conf",
"cfg",
]
file_extension = os.path.splitext(db_file.name)[1][1:].lower()
if file_extension not in editableExtensions:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File type is not editable",
)
content_bytes = payload.content.encode("utf-8")
new_size = len(content_bytes)
size_diff = new_size - db_file.size
if current_user.used_storage_bytes + size_diff > current_user.storage_quota_bytes:
raise HTTPException(
status_code=status.HTTP_507_INSUFFICIENT_STORAGE,
detail="Storage quota exceeded",
)
new_hash = hashlib.sha256(content_bytes).hexdigest()
file_extension = os.path.splitext(db_file.name)[1]
new_storage_path = f"{new_hash}{file_extension}"
await storage_manager.save_file(current_user.id, new_storage_path, content_bytes)
if new_storage_path != db_file.path:
try:
await storage_manager.delete_file(current_user.id, db_file.path)
except Exception:
pass
db_file.path = new_storage_path
db_file.size = new_size
db_file.file_hash = new_hash
db_file.updated_at = datetime.utcnow()
await db_file.save()
current_user.used_storage_bytes += size_diff
await current_user.save()
await log_activity(
user=current_user, action="file_updated", target_type="file", target_id=file_id
)
return await FileOut.from_tortoise_orm(db_file)