Compare commits
No commits in common. "de9cd8d1ba06ad133c0f77a357b03cc978c05ea4" and "ec641da2f3c5b0291d7a4f4299fd68e0c86898d5" have entirely different histories.
de9cd8d1ba
...
ec641da2f3
@ -1,8 +1,6 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime, date, timezone, timedelta
|
from datetime import datetime, date, timezone
|
||||||
|
|
||||||
from tortoise.transactions import in_transaction
|
from tortoise.transactions import in_transaction
|
||||||
|
|
||||||
from .models import UsageRecord, UsageAggregate
|
from .models import UsageRecord, UsageAggregate
|
||||||
from ..models import User
|
from ..models import User
|
||||||
|
|
||||||
@ -56,7 +54,7 @@ class UsageTracker:
|
|||||||
target_date = date.today()
|
target_date = date.today()
|
||||||
|
|
||||||
start_of_day = datetime.combine(target_date, datetime.min.time())
|
start_of_day = datetime.combine(target_date, datetime.min.time())
|
||||||
end_of_day = datetime.combine(target_date + timedelta(days=1), datetime.min.time()) - timedelta(microseconds=1)
|
end_of_day = datetime.combine(target_date, datetime.max.time())
|
||||||
|
|
||||||
storage_records = await UsageRecord.filter(
|
storage_records = await UsageRecord.filter(
|
||||||
user=user,
|
user=user,
|
||||||
|
|||||||
@ -13,8 +13,6 @@ from .auth import get_current_user, verify_password
|
|||||||
from .models import User, File, Folder, WebDAVProperty
|
from .models import User, File, Folder, WebDAVProperty
|
||||||
from .storage import storage_manager
|
from .storage import storage_manager
|
||||||
from .activity import log_activity
|
from .activity import log_activity
|
||||||
from .settings import settings
|
|
||||||
from jose import JWTError, jwt
|
|
||||||
|
|
||||||
router = APIRouter(
|
router = APIRouter(
|
||||||
prefix="/webdav",
|
prefix="/webdav",
|
||||||
@ -68,28 +66,16 @@ async def basic_auth(authorization: Optional[str] = Header(None)):
|
|||||||
|
|
||||||
|
|
||||||
async def webdav_auth(request: Request, authorization: Optional[str] = Header(None)):
|
async def webdav_auth(request: Request, authorization: Optional[str] = Header(None)):
|
||||||
# First, try Basic Auth, which is common for WebDAV clients
|
|
||||||
user = await basic_auth(authorization)
|
user = await basic_auth(authorization)
|
||||||
if user:
|
if user:
|
||||||
return user
|
return user
|
||||||
|
|
||||||
# If Basic Auth fails or is not provided, try to authenticate using the session cookie
|
try:
|
||||||
token = request.cookies.get("access_token")
|
user = await get_current_user(request)
|
||||||
if token:
|
return user
|
||||||
try:
|
except HTTPException:
|
||||||
payload = jwt.decode(
|
pass
|
||||||
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
||||||
)
|
|
||||||
username: str = payload.get("sub")
|
|
||||||
if username:
|
|
||||||
user = await User.get_or_none(username=username)
|
|
||||||
if user:
|
|
||||||
return user
|
|
||||||
except JWTError:
|
|
||||||
# Token is invalid, fall through to the final exception
|
|
||||||
pass
|
|
||||||
|
|
||||||
# If all authentication methods fail, raise 401
|
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
headers={"WWW-Authenticate": 'Basic realm="MyWebdav WebDAV"'},
|
headers={"WWW-Authenticate": 'Basic realm="MyWebdav WebDAV"'},
|
||||||
@ -97,34 +83,25 @@ async def webdav_auth(request: Request, authorization: Optional[str] = Header(No
|
|||||||
|
|
||||||
|
|
||||||
async def resolve_path(path_str: str, user: User):
|
async def resolve_path(path_str: str, user: User):
|
||||||
"""
|
if not path_str or path_str == "/":
|
||||||
Resolves a path string to a resource.
|
|
||||||
Returns a tuple: (resource, parent_folder, exists)
|
|
||||||
- resource: The File or Folder object at the path, or None if not found.
|
|
||||||
- parent_folder: The parent Folder object, or None if root.
|
|
||||||
- exists: Boolean indicating if the resource at the given path exists.
|
|
||||||
"""
|
|
||||||
path_str = path_str.strip("/")
|
|
||||||
if not path_str: # Root directory
|
|
||||||
# The root exists conceptually, but has no specific resource object.
|
|
||||||
# It contains top-level files and folders.
|
|
||||||
return None, None, True
|
return None, None, True
|
||||||
|
|
||||||
parts = [p for p in path_str.split("/") if p]
|
parts = [p for p in path_str.split("/") if p]
|
||||||
|
|
||||||
|
if not parts:
|
||||||
|
return None, None, True
|
||||||
|
|
||||||
current_folder = None
|
current_folder = None
|
||||||
# Traverse the path to find the parent of the target resource
|
|
||||||
for i, part in enumerate(parts[:-1]):
|
for i, part in enumerate(parts[:-1]):
|
||||||
folder = await Folder.get_or_none(
|
folder = await Folder.get_or_none(
|
||||||
name=part, parent=current_folder, owner=user, is_deleted=False
|
name=part, parent=current_folder, owner=user, is_deleted=False
|
||||||
)
|
)
|
||||||
if not folder:
|
if not folder:
|
||||||
# A component in the middle of the path does not exist, so the full path cannot exist.
|
|
||||||
return None, None, False
|
return None, None, False
|
||||||
current_folder = folder
|
current_folder = folder
|
||||||
|
|
||||||
last_part = parts[-1]
|
last_part = parts[-1]
|
||||||
|
|
||||||
# Check for the target resource itself (can be a folder or a file)
|
|
||||||
folder = await Folder.get_or_none(
|
folder = await Folder.get_or_none(
|
||||||
name=last_part, parent=current_folder, owner=user, is_deleted=False
|
name=last_part, parent=current_folder, owner=user, is_deleted=False
|
||||||
)
|
)
|
||||||
@ -137,8 +114,7 @@ async def resolve_path(path_str: str, user: User):
|
|||||||
if file:
|
if file:
|
||||||
return file, current_folder, True
|
return file, current_folder, True
|
||||||
|
|
||||||
# The resource itself was not found, but the path to its parent is valid.
|
return None, current_folder, True
|
||||||
return None, current_folder, False
|
|
||||||
|
|
||||||
|
|
||||||
def build_href(base_path: str, name: str, is_collection: bool):
|
def build_href(base_path: str, name: str, is_collection: bool):
|
||||||
@ -244,75 +220,195 @@ async def handle_propfind(
|
|||||||
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
||||||
):
|
):
|
||||||
depth = request.headers.get("Depth", "1")
|
depth = request.headers.get("Depth", "1")
|
||||||
full_path_str = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
body = await request.body()
|
body = await request.body()
|
||||||
requested_props = parse_propfind_body(body)
|
requested_props = parse_propfind_body(body)
|
||||||
|
|
||||||
resource, parent_folder, exists = await resolve_path(full_path_str, current_user)
|
resource, parent, exists = await resolve_path(full_path, current_user)
|
||||||
|
|
||||||
if not exists:
|
if not exists and resource is None:
|
||||||
raise HTTPException(status_code=404, detail="Not found")
|
raise HTTPException(status_code=404, detail="Not found")
|
||||||
|
|
||||||
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
|
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
|
||||||
base_href = f"/webdav/{full_path_str}" if full_path_str else "/webdav/"
|
|
||||||
|
|
||||||
# This function adds a single resource to the multistatus response
|
base_href = f"/webdav/{full_path}" if full_path else "/webdav/"
|
||||||
async def add_resource_to_response(res, res_href):
|
|
||||||
|
if resource is None:
|
||||||
response = ET.SubElement(multistatus, "D:response")
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
href = ET.SubElement(response, "D:href")
|
href = ET.SubElement(response, "D:href")
|
||||||
href.text = res_href
|
href.text = base_href if base_href.endswith("/") else base_href + "/"
|
||||||
|
|
||||||
props = {}
|
props = {
|
||||||
custom_props = None
|
"resourcetype": "collection",
|
||||||
res_type = ""
|
"displayname": full_path.split("/")[-1] if full_path else "Root",
|
||||||
res_id = None
|
"creationdate": datetime.now(),
|
||||||
|
"getlastmodified": datetime.now(),
|
||||||
|
}
|
||||||
|
response.append(create_propstat_element(props))
|
||||||
|
|
||||||
if isinstance(res, Folder):
|
if depth in ["1", "infinity"]:
|
||||||
res_type, res_id = "folder", res.id
|
folders = await Folder.filter(
|
||||||
props = {
|
owner=current_user, parent=parent, is_deleted=False
|
||||||
"resourcetype": "collection", "displayname": res.name,
|
)
|
||||||
"creationdate": res.created_at, "getlastmodified": res.updated_at,
|
files = await File.filter(
|
||||||
}
|
owner=current_user, parent=parent, is_deleted=False
|
||||||
elif isinstance(res, File):
|
)
|
||||||
res_type, res_id = "file", res.id
|
|
||||||
props = {
|
|
||||||
"resourcetype": "", "displayname": res.name,
|
|
||||||
"getcontentlength": res.size, "getcontenttype": res.mime_type,
|
|
||||||
"creationdate": res.created_at, "getlastmodified": res.updated_at,
|
|
||||||
"getetag": f'"{res.file_hash}"',
|
|
||||||
}
|
|
||||||
elif res is None and (full_path_str == "" or isinstance(resource, Folder)): # Root or empty folder
|
|
||||||
props = {
|
|
||||||
"resourcetype": "collection", "displayname": resource.name if resource else "Root",
|
|
||||||
"creationdate": resource.created_at if resource else datetime.now(),
|
|
||||||
"getlastmodified": resource.updated_at if resource else datetime.now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
if res_type and (requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props))):
|
for folder in folders:
|
||||||
custom_props = await get_custom_properties(res_type, res_id)
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
|
href = ET.SubElement(response, "D:href")
|
||||||
|
href.text = build_href(base_href, folder.name, True)
|
||||||
|
|
||||||
|
props = {
|
||||||
|
"resourcetype": "collection",
|
||||||
|
"displayname": folder.name,
|
||||||
|
"creationdate": folder.created_at,
|
||||||
|
"getlastmodified": folder.updated_at,
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("folder", folder.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
|
href = ET.SubElement(response, "D:href")
|
||||||
|
href.text = build_href(base_href, file.name, False)
|
||||||
|
|
||||||
|
props = {
|
||||||
|
"resourcetype": "",
|
||||||
|
"displayname": file.name,
|
||||||
|
"getcontentlength": file.size,
|
||||||
|
"getcontenttype": file.mime_type,
|
||||||
|
"creationdate": file.created_at,
|
||||||
|
"getlastmodified": file.updated_at,
|
||||||
|
"getetag": f'"{file.file_hash}"',
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("file", file.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
|
elif isinstance(resource, Folder):
|
||||||
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
|
href = ET.SubElement(response, "D:href")
|
||||||
|
href.text = base_href if base_href.endswith("/") else base_href + "/"
|
||||||
|
|
||||||
|
props = {
|
||||||
|
"resourcetype": "collection",
|
||||||
|
"displayname": resource.name,
|
||||||
|
"creationdate": resource.created_at,
|
||||||
|
"getlastmodified": resource.updated_at,
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("folder", resource.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
response.append(create_propstat_element(props, custom_props))
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
# Add the main resource itself to the response
|
if depth in ["1", "infinity"]:
|
||||||
res_href = base_href if isinstance(resource, File) else (base_href if base_href.endswith('/') else base_href + '/')
|
folders = await Folder.filter(
|
||||||
await add_resource_to_response(resource, res_href)
|
owner=current_user, parent=resource, is_deleted=False
|
||||||
|
)
|
||||||
|
files = await File.filter(
|
||||||
|
owner=current_user, parent=resource, is_deleted=False
|
||||||
|
)
|
||||||
|
|
||||||
# If depth is 1 or infinity, add children
|
for folder in folders:
|
||||||
if depth in ["1", "infinity"]:
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
target_folder = resource if isinstance(resource, Folder) else (None if full_path_str == "" else parent_folder)
|
href = ET.SubElement(response, "D:href")
|
||||||
|
href.text = build_href(base_href, folder.name, True)
|
||||||
|
|
||||||
folders = await Folder.filter(owner=current_user, parent=target_folder, is_deleted=False)
|
props = {
|
||||||
files = await File.filter(owner=current_user, parent=target_folder, is_deleted=False)
|
"resourcetype": "collection",
|
||||||
|
"displayname": folder.name,
|
||||||
|
"creationdate": folder.created_at,
|
||||||
|
"getlastmodified": folder.updated_at,
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("folder", folder.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
for folder in folders:
|
for file in files:
|
||||||
child_href = build_href(base_href, folder.name, True)
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
await add_resource_to_response(folder, child_href)
|
href = ET.SubElement(response, "D:href")
|
||||||
for file in files:
|
href.text = build_href(base_href, file.name, False)
|
||||||
child_href = build_href(base_href, file.name, False)
|
|
||||||
await add_resource_to_response(file, child_href)
|
props = {
|
||||||
|
"resourcetype": "",
|
||||||
|
"displayname": file.name,
|
||||||
|
"getcontentlength": file.size,
|
||||||
|
"getcontenttype": file.mime_type,
|
||||||
|
"creationdate": file.created_at,
|
||||||
|
"getlastmodified": file.updated_at,
|
||||||
|
"getetag": f'"{file.file_hash}"',
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("file", file.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
|
elif isinstance(resource, File):
|
||||||
|
response = ET.SubElement(multistatus, "D:response")
|
||||||
|
href = ET.SubElement(response, "D:href")
|
||||||
|
href.text = base_href
|
||||||
|
|
||||||
|
props = {
|
||||||
|
"resourcetype": "",
|
||||||
|
"displayname": resource.name,
|
||||||
|
"getcontentlength": resource.size,
|
||||||
|
"getcontenttype": resource.mime_type,
|
||||||
|
"creationdate": resource.created_at,
|
||||||
|
"getlastmodified": resource.updated_at,
|
||||||
|
"getetag": f'"{resource.file_hash}"',
|
||||||
|
}
|
||||||
|
custom_props = (
|
||||||
|
await get_custom_properties("file", resource.id)
|
||||||
|
if requested_props == "allprop"
|
||||||
|
or (
|
||||||
|
isinstance(requested_props, list)
|
||||||
|
and any(ns != "DAV:" for ns, _ in requested_props)
|
||||||
|
)
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
response.append(create_propstat_element(props, custom_props))
|
||||||
|
|
||||||
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
|
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
|
||||||
return Response(content=xml_content, media_type="application/xml; charset=utf-8", status_code=207)
|
return Response(
|
||||||
|
content=xml_content,
|
||||||
|
media_type="application/xml; charset=utf-8",
|
||||||
|
status_code=207,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.api_route("/{full_path:path}", methods=["GET", "HEAD"])
|
@router.api_route("/{full_path:path}", methods=["GET", "HEAD"])
|
||||||
@ -320,9 +416,10 @@ async def handle_get(
|
|||||||
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
resource, _, exists = await resolve_path(full_path, current_user)
|
|
||||||
|
|
||||||
if not exists or not isinstance(resource, File):
|
resource, parent, exists = await resolve_path(full_path, current_user)
|
||||||
|
|
||||||
|
if not isinstance(resource, File):
|
||||||
raise HTTPException(status_code=404, detail="File not found")
|
raise HTTPException(status_code=404, detail="File not found")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -333,7 +430,9 @@ async def handle_get(
|
|||||||
"Content-Length": str(resource.size),
|
"Content-Length": str(resource.size),
|
||||||
"Content-Type": resource.mime_type,
|
"Content-Type": resource.mime_type,
|
||||||
"ETag": f'"{resource.file_hash}"',
|
"ETag": f'"{resource.file_hash}"',
|
||||||
"Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
|
"Last-Modified": resource.updated_at.strftime(
|
||||||
|
"%a, %d %b %Y %H:%M:%S GMT"
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -347,7 +446,9 @@ async def handle_get(
|
|||||||
headers={
|
headers={
|
||||||
"Content-Disposition": f'attachment; filename="{resource.name}"',
|
"Content-Disposition": f'attachment; filename="{resource.name}"',
|
||||||
"ETag": f'"{resource.file_hash}"',
|
"ETag": f'"{resource.file_hash}"',
|
||||||
"Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
|
"Last-Modified": resource.updated_at.strftime(
|
||||||
|
"%a, %d %b %Y %H:%M:%S GMT"
|
||||||
|
),
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
@ -359,19 +460,18 @@ async def handle_put(
|
|||||||
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
|
|
||||||
if not full_path:
|
if not full_path:
|
||||||
raise HTTPException(status_code=400, detail="Cannot PUT to root")
|
raise HTTPException(status_code=400, detail="Cannot PUT to root")
|
||||||
|
|
||||||
parts = [p for p in full_path.split("/") if p]
|
parts = [p for p in full_path.split("/") if p]
|
||||||
file_name = parts[-1]
|
file_name = parts[-1]
|
||||||
parent_path = "/".join(parts[:-1])
|
|
||||||
|
|
||||||
parent_resource, _, parent_exists = await resolve_path(parent_path, current_user)
|
parent_path = "/".join(parts[:-1]) if len(parts) > 1 else ""
|
||||||
|
_, parent_folder, exists = await resolve_path(parent_path, current_user)
|
||||||
|
|
||||||
if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)):
|
if not exists:
|
||||||
raise HTTPException(status_code=409, detail="Parent collection does not exist")
|
raise HTTPException(status_code=409, detail="Parent folder does not exist")
|
||||||
|
|
||||||
parent_folder = parent_resource if isinstance(parent_resource, Folder) else None
|
|
||||||
|
|
||||||
file_content = await request.body()
|
file_content = await request.body()
|
||||||
file_size = len(file_content)
|
file_size = len(file_content)
|
||||||
@ -387,7 +487,8 @@ async def handle_put(
|
|||||||
await storage_manager.save_file(current_user.id, storage_path, file_content)
|
await storage_manager.save_file(current_user.id, storage_path, file_content)
|
||||||
|
|
||||||
mime_type, _ = mimetypes.guess_type(file_name)
|
mime_type, _ = mimetypes.guess_type(file_name)
|
||||||
mime_type = mime_type or "application/octet-stream"
|
if not mime_type:
|
||||||
|
mime_type = "application/octet-stream"
|
||||||
|
|
||||||
existing_file = await File.get_or_none(
|
existing_file = await File.get_or_none(
|
||||||
name=file_name, parent=parent_folder, owner=current_user, is_deleted=False
|
name=file_name, parent=parent_folder, owner=current_user, is_deleted=False
|
||||||
@ -395,21 +496,34 @@ async def handle_put(
|
|||||||
|
|
||||||
if existing_file:
|
if existing_file:
|
||||||
old_size = existing_file.size
|
old_size = existing_file.size
|
||||||
existing_file.path, existing_file.size, existing_file.mime_type, existing_file.file_hash = \
|
existing_file.path = storage_path
|
||||||
storage_path, file_size, mime_type, file_hash
|
existing_file.size = file_size
|
||||||
|
existing_file.mime_type = mime_type
|
||||||
|
existing_file.file_hash = file_hash
|
||||||
existing_file.updated_at = datetime.now()
|
existing_file.updated_at = datetime.now()
|
||||||
await existing_file.save()
|
await existing_file.save()
|
||||||
current_user.used_storage_bytes += (file_size - old_size)
|
|
||||||
|
current_user.used_storage_bytes = (
|
||||||
|
current_user.used_storage_bytes - old_size + file_size
|
||||||
|
)
|
||||||
await current_user.save()
|
await current_user.save()
|
||||||
|
|
||||||
await log_activity(current_user, "file_updated", "file", existing_file.id)
|
await log_activity(current_user, "file_updated", "file", existing_file.id)
|
||||||
return Response(status_code=204)
|
return Response(status_code=204)
|
||||||
else:
|
else:
|
||||||
db_file = await File.create(
|
db_file = await File.create(
|
||||||
name=file_name, path=storage_path, size=file_size, mime_type=mime_type,
|
name=file_name,
|
||||||
file_hash=file_hash, owner=current_user, parent=parent_folder
|
path=storage_path,
|
||||||
|
size=file_size,
|
||||||
|
mime_type=mime_type,
|
||||||
|
file_hash=file_hash,
|
||||||
|
owner=current_user,
|
||||||
|
parent=parent_folder,
|
||||||
)
|
)
|
||||||
|
|
||||||
current_user.used_storage_bytes += file_size
|
current_user.used_storage_bytes += file_size
|
||||||
await current_user.save()
|
await current_user.save()
|
||||||
|
|
||||||
await log_activity(current_user, "file_created", "file", db_file.id)
|
await log_activity(current_user, "file_created", "file", db_file.id)
|
||||||
return Response(status_code=201)
|
return Response(status_code=201)
|
||||||
|
|
||||||
@ -419,12 +533,13 @@ async def handle_delete(
|
|||||||
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
|
|
||||||
if not full_path:
|
if not full_path:
|
||||||
raise HTTPException(status_code=400, detail="Cannot DELETE root")
|
raise HTTPException(status_code=400, detail="Cannot DELETE root")
|
||||||
|
|
||||||
resource, _, exists = await resolve_path(full_path, current_user)
|
resource, parent, exists = await resolve_path(full_path, current_user)
|
||||||
|
|
||||||
if not exists:
|
if not resource:
|
||||||
raise HTTPException(status_code=404, detail="Resource not found")
|
raise HTTPException(status_code=404, detail="Resource not found")
|
||||||
|
|
||||||
if isinstance(resource, File):
|
if isinstance(resource, File):
|
||||||
@ -433,13 +548,7 @@ async def handle_delete(
|
|||||||
await resource.save()
|
await resource.save()
|
||||||
await log_activity(current_user, "file_deleted", "file", resource.id)
|
await log_activity(current_user, "file_deleted", "file", resource.id)
|
||||||
elif isinstance(resource, Folder):
|
elif isinstance(resource, Folder):
|
||||||
child_files = await File.filter(parent=resource, is_deleted=False).count()
|
|
||||||
child_folders = await Folder.filter(parent=resource, is_deleted=False).count()
|
|
||||||
if child_files > 0 or child_folders > 0:
|
|
||||||
raise HTTPException(status_code=409, detail="Folder is not empty")
|
|
||||||
|
|
||||||
resource.is_deleted = True
|
resource.is_deleted = True
|
||||||
resource.deleted_at = datetime.now()
|
|
||||||
await resource.save()
|
await resource.save()
|
||||||
await log_activity(current_user, "folder_deleted", "folder", resource.id)
|
await log_activity(current_user, "folder_deleted", "folder", resource.id)
|
||||||
|
|
||||||
@ -451,25 +560,30 @@ async def handle_mkcol(
|
|||||||
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
if not full_path:
|
|
||||||
raise HTTPException(status_code=405, detail="Cannot MKCOL at root")
|
|
||||||
|
|
||||||
resource, _, exists = await resolve_path(full_path, current_user)
|
if not full_path:
|
||||||
if exists:
|
raise HTTPException(status_code=400, detail="Cannot MKCOL at root")
|
||||||
raise HTTPException(status_code=405, detail="Resource already exists")
|
|
||||||
|
|
||||||
parts = [p for p in full_path.split("/") if p]
|
parts = [p for p in full_path.split("/") if p]
|
||||||
folder_name = parts[-1]
|
folder_name = parts[-1]
|
||||||
parent_path = "/".join(parts[:-1])
|
|
||||||
|
|
||||||
parent_resource, _, parent_exists = await resolve_path(parent_path, current_user)
|
parent_path = "/".join(parts[:-1]) if len(parts) > 1 else ""
|
||||||
|
_, parent_folder, exists = await resolve_path(parent_path, current_user)
|
||||||
|
|
||||||
if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)):
|
if not exists:
|
||||||
raise HTTPException(status_code=409, detail="Parent collection does not exist")
|
raise HTTPException(status_code=409, detail="Parent folder does not exist")
|
||||||
|
|
||||||
parent_folder = parent_resource if isinstance(parent_resource, Folder) else None
|
existing = await Folder.get_or_none(
|
||||||
|
name=folder_name, parent=parent_folder, owner=current_user, is_deleted=False
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
raise HTTPException(status_code=405, detail="Folder already exists")
|
||||||
|
|
||||||
|
folder = await Folder.create(
|
||||||
|
name=folder_name, parent=parent_folder, owner=current_user
|
||||||
|
)
|
||||||
|
|
||||||
folder = await Folder.create(name=folder_name, parent=parent_folder, owner=current_user)
|
|
||||||
await log_activity(current_user, "folder_created", "folder", folder.id)
|
await log_activity(current_user, "folder_created", "folder", folder.id)
|
||||||
return Response(status_code=201)
|
return Response(status_code=201)
|
||||||
|
|
||||||
@ -480,52 +594,55 @@ async def handle_copy(
|
|||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
destination = request.headers.get("Destination")
|
destination = request.headers.get("Destination")
|
||||||
overwrite = request.headers.get("Overwrite", "T").upper()
|
overwrite = request.headers.get("Overwrite", "T")
|
||||||
|
|
||||||
if not destination:
|
if not destination:
|
||||||
raise HTTPException(status_code=400, detail="Destination header required")
|
raise HTTPException(status_code=400, detail="Destination header required")
|
||||||
|
|
||||||
dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/")
|
dest_path = unquote(urlparse(destination).path)
|
||||||
source_resource, _, source_exists = await resolve_path(full_path, current_user)
|
dest_path = dest_path.replace("/webdav/", "").strip("/")
|
||||||
|
|
||||||
if not source_exists:
|
source_resource, _, exists = await resolve_path(full_path, current_user)
|
||||||
|
|
||||||
|
if not source_resource:
|
||||||
raise HTTPException(status_code=404, detail="Source not found")
|
raise HTTPException(status_code=404, detail="Source not found")
|
||||||
|
|
||||||
if not isinstance(source_resource, File):
|
if not isinstance(source_resource, File):
|
||||||
raise HTTPException(status_code=501, detail="Only file copy is implemented")
|
raise HTTPException(status_code=501, detail="Only file copy is implemented")
|
||||||
|
|
||||||
dest_name = dest_path.split("/")[-1]
|
dest_parts = [p for p in dest_path.split("/") if p]
|
||||||
dest_parent_path = "/".join(dest_path.split("/")[:-1])
|
dest_name = dest_parts[-1]
|
||||||
|
dest_parent_path = "/".join(dest_parts[:-1]) if len(dest_parts) > 1 else ""
|
||||||
|
|
||||||
dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user)
|
_, dest_parent, exists = await resolve_path(dest_parent_path, current_user)
|
||||||
|
|
||||||
if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)):
|
if not exists:
|
||||||
raise HTTPException(status_code=409, detail="Destination parent collection does not exist")
|
raise HTTPException(status_code=409, detail="Destination parent does not exist")
|
||||||
|
|
||||||
dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None
|
existing_dest = await File.get_or_none(
|
||||||
|
name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False
|
||||||
|
)
|
||||||
|
|
||||||
existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user)
|
if existing_dest and overwrite == "F":
|
||||||
|
raise HTTPException(
|
||||||
if existing_dest_exists and overwrite == "F":
|
status_code=412, detail="Destination exists and overwrite is false"
|
||||||
raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'")
|
|
||||||
|
|
||||||
if existing_dest_exists:
|
|
||||||
# Update existing_dest instead of deleting and recreating
|
|
||||||
existing_dest.path = source_resource.path
|
|
||||||
existing_dest.size = source_resource.size
|
|
||||||
existing_dest.mime_type = source_resource.mime_type
|
|
||||||
existing_dest.file_hash = source_resource.file_hash
|
|
||||||
existing_dest.updated_at = datetime.now()
|
|
||||||
await existing_dest.save()
|
|
||||||
await log_activity(current_user, "file_copied", "file", existing_dest.id)
|
|
||||||
return Response(status_code=204)
|
|
||||||
else:
|
|
||||||
new_file = await File.create(
|
|
||||||
name=dest_name, path=source_resource.path, size=source_resource.size,
|
|
||||||
mime_type=source_resource.mime_type, file_hash=source_resource.file_hash,
|
|
||||||
owner=current_user, parent=dest_parent_folder
|
|
||||||
)
|
)
|
||||||
await log_activity(current_user, "file_copied", "file", new_file.id)
|
|
||||||
return Response(status_code=201)
|
if existing_dest:
|
||||||
|
await existing_dest.delete()
|
||||||
|
|
||||||
|
new_file = await File.create(
|
||||||
|
name=dest_name,
|
||||||
|
path=source_resource.path,
|
||||||
|
size=source_resource.size,
|
||||||
|
mime_type=source_resource.mime_type,
|
||||||
|
file_hash=source_resource.file_hash,
|
||||||
|
owner=current_user,
|
||||||
|
parent=dest_parent,
|
||||||
|
)
|
||||||
|
|
||||||
|
await log_activity(current_user, "file_copied", "file", new_file.id)
|
||||||
|
return Response(status_code=201 if not existing_dest else 204)
|
||||||
|
|
||||||
|
|
||||||
@router.api_route("/{full_path:path}", methods=["MOVE"])
|
@router.api_route("/{full_path:path}", methods=["MOVE"])
|
||||||
@ -534,75 +651,68 @@ async def handle_move(
|
|||||||
):
|
):
|
||||||
full_path = unquote(full_path).strip("/")
|
full_path = unquote(full_path).strip("/")
|
||||||
destination = request.headers.get("Destination")
|
destination = request.headers.get("Destination")
|
||||||
overwrite = request.headers.get("Overwrite", "T").upper()
|
overwrite = request.headers.get("Overwrite", "T")
|
||||||
|
|
||||||
if not destination:
|
if not destination:
|
||||||
raise HTTPException(status_code=400, detail="Destination header required")
|
raise HTTPException(status_code=400, detail="Destination header required")
|
||||||
|
|
||||||
dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/")
|
dest_path = unquote(urlparse(destination).path)
|
||||||
source_resource, _, source_exists = await resolve_path(full_path, current_user)
|
dest_path = dest_path.replace("/webdav/", "").strip("/")
|
||||||
|
|
||||||
if not source_exists:
|
source_resource, _, exists = await resolve_path(full_path, current_user)
|
||||||
|
|
||||||
|
if not source_resource:
|
||||||
raise HTTPException(status_code=404, detail="Source not found")
|
raise HTTPException(status_code=404, detail="Source not found")
|
||||||
|
|
||||||
dest_name = dest_path.split("/")[-1]
|
dest_parts = [p for p in dest_path.split("/") if p]
|
||||||
dest_parent_path = "/".join(dest_path.split("/")[:-1])
|
dest_name = dest_parts[-1]
|
||||||
|
dest_parent_path = "/".join(dest_parts[:-1]) if len(dest_parts) > 1 else ""
|
||||||
|
|
||||||
dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user)
|
_, dest_parent, exists = await resolve_path(dest_parent_path, current_user)
|
||||||
|
|
||||||
if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)):
|
if not exists:
|
||||||
raise HTTPException(status_code=409, detail="Destination parent collection does not exist")
|
raise HTTPException(status_code=409, detail="Destination parent does not exist")
|
||||||
|
|
||||||
dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None
|
if isinstance(source_resource, File):
|
||||||
|
existing_dest = await File.get_or_none(
|
||||||
|
name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False
|
||||||
|
)
|
||||||
|
|
||||||
existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user)
|
if existing_dest and overwrite == "F":
|
||||||
|
raise HTTPException(
|
||||||
if existing_dest_exists and overwrite == "F":
|
status_code=412, detail="Destination exists and overwrite is false"
|
||||||
raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'")
|
|
||||||
|
|
||||||
# Handle overwrite scenario
|
|
||||||
if existing_dest_exists:
|
|
||||||
if isinstance(source_resource, File):
|
|
||||||
# Update existing_dest with source_resource's properties
|
|
||||||
existing_dest.name = dest_name
|
|
||||||
existing_dest.path = source_resource.path
|
|
||||||
existing_dest.size = source_resource.size
|
|
||||||
existing_dest.mime_type = source_resource.mime_type
|
|
||||||
existing_dest.file_hash = source_resource.file_hash
|
|
||||||
existing_dest.parent = dest_parent_folder
|
|
||||||
existing_dest.updated_at = datetime.now()
|
|
||||||
await existing_dest.save()
|
|
||||||
await log_activity(current_user, "file_moved_overwrite", "file", existing_dest.id)
|
|
||||||
elif isinstance(source_resource, Folder):
|
|
||||||
raise HTTPException(status_code=501, detail="Folder move overwrite not implemented")
|
|
||||||
|
|
||||||
# Mark source as deleted
|
|
||||||
source_resource.is_deleted = True
|
|
||||||
source_resource.deleted_at = datetime.now()
|
|
||||||
await source_resource.save()
|
|
||||||
await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id)
|
|
||||||
|
|
||||||
return Response(status_code=204) # Overwrite means 204 No Content
|
|
||||||
|
|
||||||
# Handle non-overwrite scenario (create new resource at destination)
|
|
||||||
else:
|
|
||||||
if isinstance(source_resource, File):
|
|
||||||
new_file = await File.create(
|
|
||||||
name=dest_name, path=source_resource.path, size=source_resource.size,
|
|
||||||
mime_type=source_resource.mime_type, file_hash=source_resource.file_hash,
|
|
||||||
owner=current_user, parent=dest_parent_folder
|
|
||||||
)
|
)
|
||||||
await log_activity(current_user, "file_moved_created", "file", new_file.id)
|
|
||||||
elif isinstance(source_resource, Folder):
|
|
||||||
raise HTTPException(status_code=501, detail="Folder move not implemented")
|
|
||||||
|
|
||||||
# Mark source as deleted
|
if existing_dest:
|
||||||
source_resource.is_deleted = True
|
await existing_dest.delete()
|
||||||
source_resource.deleted_at = datetime.now()
|
|
||||||
|
source_resource.name = dest_name
|
||||||
|
source_resource.parent = dest_parent
|
||||||
await source_resource.save()
|
await source_resource.save()
|
||||||
await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id)
|
|
||||||
|
|
||||||
return Response(status_code=201) # New resource created means 201 Created
|
await log_activity(current_user, "file_moved", "file", source_resource.id)
|
||||||
|
return Response(status_code=201 if not existing_dest else 204)
|
||||||
|
|
||||||
|
elif isinstance(source_resource, Folder):
|
||||||
|
existing_dest = await Folder.get_or_none(
|
||||||
|
name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing_dest and overwrite == "F":
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=412, detail="Destination exists and overwrite is false"
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing_dest:
|
||||||
|
existing_dest.is_deleted = True
|
||||||
|
await existing_dest.save()
|
||||||
|
|
||||||
|
source_resource.name = dest_name
|
||||||
|
source_resource.parent = dest_parent
|
||||||
|
await source_resource.save()
|
||||||
|
|
||||||
|
await log_activity(current_user, "folder_moved", "folder", source_resource.id)
|
||||||
|
return Response(status_code=201 if not existing_dest else 204)
|
||||||
|
|
||||||
|
|
||||||
@router.api_route("/{full_path:path}", methods=["LOCK"])
|
@router.api_route("/{full_path:path}", methods=["LOCK"])
|
||||||
|
|||||||
@ -1,735 +0,0 @@
|
|||||||
import pytest
|
|
||||||
import pytest_asyncio
|
|
||||||
from httpx import AsyncClient, ASGITransport
|
|
||||||
from fastapi import status
|
|
||||||
from mywebdav.main import app
|
|
||||||
from mywebdav.models import User, Folder, File, WebDAVProperty
|
|
||||||
from mywebdav.auth import get_password_hash
|
|
||||||
import base64
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def test_user():
|
|
||||||
user = await User.create(
|
|
||||||
username="testuser",
|
|
||||||
email="test@example.com",
|
|
||||||
hashed_password=get_password_hash("testpass"),
|
|
||||||
is_active=True,
|
|
||||||
is_superuser=False,
|
|
||||||
)
|
|
||||||
yield user
|
|
||||||
await user.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def test_folder(test_user):
|
|
||||||
folder = await Folder.create(
|
|
||||||
name="testfolder",
|
|
||||||
owner=test_user,
|
|
||||||
)
|
|
||||||
yield folder
|
|
||||||
await folder.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture
|
|
||||||
async def test_file(test_user, test_folder):
|
|
||||||
file = await File.create(
|
|
||||||
name="testfile.txt",
|
|
||||||
path="testfile.txt",
|
|
||||||
size=13,
|
|
||||||
mime_type="text/plain",
|
|
||||||
file_hash="dummyhash",
|
|
||||||
owner=test_user,
|
|
||||||
parent=test_folder,
|
|
||||||
)
|
|
||||||
yield file
|
|
||||||
await file.delete()
|
|
||||||
|
|
||||||
|
|
||||||
def get_basic_auth_header(username, password):
|
|
||||||
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
|
|
||||||
return {"Authorization": f"Basic {credentials}"}
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_options():
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.options("/webdav/")
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert "DAV" in response.headers
|
|
||||||
assert "Allow" in response.headers
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_root_unauthorized():
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
"/webdav/",
|
|
||||||
headers={"Depth": "1"},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_root(test_user):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
"/webdav/",
|
|
||||||
headers={
|
|
||||||
"Depth": "1",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
# Parse XML response
|
|
||||||
content = response.text
|
|
||||||
assert "<D:multistatus" in content
|
|
||||||
assert "<D:response>" in content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_folder(test_user, test_folder):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
f"/webdav/{test_folder.name}/",
|
|
||||||
headers={
|
|
||||||
"Depth": "1",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
content = response.text
|
|
||||||
assert test_folder.name in content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_file(test_user, test_file):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Depth": "0",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
content = response.text
|
|
||||||
assert test_file.name in content
|
|
||||||
assert "text/plain" in content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_get_file(test_user, test_file):
|
|
||||||
# Mock storage manager to return file content
|
|
||||||
from mywebdav import storage
|
|
||||||
original_get_file = storage.storage_manager.get_file
|
|
||||||
|
|
||||||
async def mock_get_file(user_id, path):
|
|
||||||
if path == test_file.path:
|
|
||||||
yield b"Hello, World!"
|
|
||||||
else:
|
|
||||||
raise FileNotFoundError()
|
|
||||||
|
|
||||||
storage.storage_manager.get_file = mock_get_file
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.get(
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass")
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.content == b"Hello, World!"
|
|
||||||
finally:
|
|
||||||
storage.storage_manager.get_file = original_get_file
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_put_file(test_user, test_folder):
|
|
||||||
from mywebdav import storage
|
|
||||||
original_save_file = storage.storage_manager.save_file
|
|
||||||
|
|
||||||
saved_content = None
|
|
||||||
saved_path = None
|
|
||||||
|
|
||||||
async def mock_save_file(user_id, path, content):
|
|
||||||
nonlocal saved_content, saved_path
|
|
||||||
saved_content = content
|
|
||||||
saved_path = path
|
|
||||||
|
|
||||||
storage.storage_manager.save_file = mock_save_file
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.put(
|
|
||||||
f"/webdav/{test_folder.name}/newfile.txt",
|
|
||||||
content=b"New file content",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass")
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_201_CREATED
|
|
||||||
assert saved_content == b"New file content"
|
|
||||||
|
|
||||||
# Check if file was created in DB
|
|
||||||
file = await File.get_or_none(
|
|
||||||
name="newfile.txt", parent=test_folder, owner=test_user, is_deleted=False
|
|
||||||
)
|
|
||||||
assert file is not None
|
|
||||||
assert file.path == saved_path
|
|
||||||
await file.delete()
|
|
||||||
finally:
|
|
||||||
storage.storage_manager.save_file = original_save_file
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_mkcol(test_user):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MKCOL",
|
|
||||||
"/webdav/newfolder/",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass")
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_201_CREATED
|
|
||||||
|
|
||||||
# Check if folder was created
|
|
||||||
folder = await Folder.get_or_none(
|
|
||||||
name="newfolder", parent=None, owner=test_user, is_deleted=False
|
|
||||||
)
|
|
||||||
assert folder is not None
|
|
||||||
await folder.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_delete_file(test_user, test_file):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.delete(
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass")
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
|
|
||||||
# Check if file was marked as deleted
|
|
||||||
updated_file = await File.get(id=test_file.id)
|
|
||||||
assert updated_file.is_deleted == True
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_delete_folder(test_user, test_folder):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.delete(
|
|
||||||
f"/webdav/{test_folder.name}/",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass")
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
|
|
||||||
# Check if folder was marked as deleted
|
|
||||||
updated_folder = await Folder.get(id=test_folder.id)
|
|
||||||
assert updated_folder.is_deleted == True
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_copy_file(test_user, test_file):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"COPY",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{test_file.parent.name}/copied_{test_file.name}",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_201_CREATED
|
|
||||||
|
|
||||||
# Check if copy was created
|
|
||||||
copied_file = await File.get_or_none(
|
|
||||||
name=f"copied_{test_file.name}", parent=test_file.parent, owner=test_user, is_deleted=False
|
|
||||||
)
|
|
||||||
assert copied_file is not None
|
|
||||||
await copied_file.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_move_file(test_user, test_file, test_folder):
|
|
||||||
# Create another folder
|
|
||||||
dest_folder = await Folder.create(
|
|
||||||
name="destfolder",
|
|
||||||
owner=test_user,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MOVE",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_201_CREATED
|
|
||||||
|
|
||||||
# Check if file was moved
|
|
||||||
moved_file = await File.get_or_none(
|
|
||||||
name=test_file.name, parent=dest_folder, owner=test_user, is_deleted=False
|
|
||||||
)
|
|
||||||
assert moved_file is not None
|
|
||||||
|
|
||||||
# Original should be gone
|
|
||||||
original_file = await File.get_or_none(
|
|
||||||
id=test_file.id, is_deleted=False
|
|
||||||
)
|
|
||||||
assert original_file is None
|
|
||||||
finally:
|
|
||||||
await dest_folder.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_lock_unlock(test_user, test_file):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
# Lock
|
|
||||||
lock_xml = """<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<D:lockinfo xmlns:D="DAV:">
|
|
||||||
<D:lockscope><D:exclusive/></D:lockscope>
|
|
||||||
<D:locktype><D:write/></D:locktype>
|
|
||||||
</D:lockinfo>"""
|
|
||||||
|
|
||||||
response = await client.request(
|
|
||||||
"LOCK",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
content=lock_xml,
|
|
||||||
headers={
|
|
||||||
"Content-Type": "application/xml",
|
|
||||||
"Timeout": "Second-3600",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
lock_token = response.headers.get("Lock-Token")
|
|
||||||
assert lock_token is not None
|
|
||||||
|
|
||||||
# Unlock
|
|
||||||
response = await client.request(
|
|
||||||
"UNLOCK",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Lock-Token": lock_token,
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_allprop(test_user, test_file):
|
|
||||||
propfind_xml = """<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<D:propfind xmlns:D="DAV:">
|
|
||||||
<D:allprop/>
|
|
||||||
</D:propfind>"""
|
|
||||||
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
content=propfind_xml,
|
|
||||||
headers={
|
|
||||||
"Content-Type": "application/xml",
|
|
||||||
"Depth": "0",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
content = response.text
|
|
||||||
assert "getcontentlength" in content
|
|
||||||
assert "getcontenttype" in content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_proppatch(test_user, test_file):
|
|
||||||
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<D:propertyupdate xmlns:D="DAV:">
|
|
||||||
<D:set>
|
|
||||||
<D:prop>
|
|
||||||
<custom:author xmlns:custom="http://example.com">Test Author</custom:author>
|
|
||||||
</D:prop>
|
|
||||||
</D:set>
|
|
||||||
</D:propertyupdate>"""
|
|
||||||
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPPATCH",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
content=proppatch_xml,
|
|
||||||
headers={
|
|
||||||
"Content-Type": "application/xml",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
|
|
||||||
# Check if property was set
|
|
||||||
from mywebdav.models import WebDAVProperty
|
|
||||||
prop = await WebDAVProperty.get_or_none(
|
|
||||||
resource_type="file",
|
|
||||||
resource_id=test_file.id,
|
|
||||||
namespace="http://example.com",
|
|
||||||
name="author"
|
|
||||||
)
|
|
||||||
await prop.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_head_file(test_user, test_file):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.head(
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.headers["Content-Length"] == str(test_file.size)
|
|
||||||
assert response.headers["Content-Type"] == test_file.mime_type
|
|
||||||
assert response.content == b""
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_put_update_file(test_user, test_file):
|
|
||||||
from mywebdav import storage
|
|
||||||
original_save_file = storage.storage_manager.save_file
|
|
||||||
saved_content = None
|
|
||||||
async def mock_save_file(user_id, path, content):
|
|
||||||
nonlocal saved_content
|
|
||||||
saved_content = content
|
|
||||||
storage.storage_manager.save_file = mock_save_file
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.put(
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
content=b"Updated content",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
assert saved_content == b"Updated content"
|
|
||||||
|
|
||||||
updated_file = await File.get(id=test_file.id)
|
|
||||||
assert updated_file.size == len(b"Updated content")
|
|
||||||
finally:
|
|
||||||
storage.storage_manager.save_file = original_save_file
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_copy_file_overwrite_true(test_user, test_file):
|
|
||||||
dest_file = await File.create(
|
|
||||||
name="destination.txt", parent=test_file.parent, owner=test_user,
|
|
||||||
path="dest.txt", size=1, mime_type="text/plain", file_hash="oldhash"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"COPY",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{test_file.parent.name}/destination.txt",
|
|
||||||
"Overwrite": "T",
|
|
||||||
**get_basic_auth_header("testuser", "testpass"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
|
|
||||||
# The original destination file should have been updated
|
|
||||||
updated_dest_file = await File.get(id=dest_file.id)
|
|
||||||
assert updated_dest_file.is_deleted == False
|
|
||||||
assert updated_dest_file.file_hash == test_file.file_hash
|
|
||||||
assert updated_dest_file.size == test_file.size
|
|
||||||
assert updated_dest_file.name == dest_file.name # Name should remain the same
|
|
||||||
assert updated_dest_file.parent_id == test_file.parent_id # Parent should remain the same
|
|
||||||
|
|
||||||
# No new file should have been created with the destination name
|
|
||||||
new_file_check = await File.get_or_none(name="destination.txt", parent=test_file.parent, is_deleted=False)
|
|
||||||
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
|
|
||||||
|
|
||||||
|
|
||||||
finally:
|
|
||||||
await dest_file.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_move_file_overwrite_true(test_user, test_file):
|
|
||||||
dest_folder = await Folder.create(name="destfolder", owner=test_user)
|
|
||||||
existing_dest_file = await File.create(
|
|
||||||
name=test_file.name, parent=dest_folder, owner=test_user,
|
|
||||||
path="existing.txt", size=1, mime_type="text/plain", file_hash="oldhash"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MOVE",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
|
||||||
"Overwrite": "T",
|
|
||||||
**get_basic_auth_header("testuser", "testpass"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
|
||||||
|
|
||||||
# The original source file should be marked as deleted
|
|
||||||
original_source_file = await File.get_or_none(id=test_file.id, is_deleted=True)
|
|
||||||
assert original_source_file is not None
|
|
||||||
|
|
||||||
# The existing destination file should have been updated
|
|
||||||
updated_dest_file = await File.get(id=existing_dest_file.id)
|
|
||||||
assert updated_dest_file.is_deleted == False
|
|
||||||
assert updated_dest_file.file_hash == test_file.file_hash # Should have source's hash
|
|
||||||
assert updated_dest_file.size == test_file.size # Should have source's size
|
|
||||||
assert updated_dest_file.name == existing_dest_file.name # Name should remain the same
|
|
||||||
assert updated_dest_file.parent_id == dest_folder.id # Parent should remain the same
|
|
||||||
|
|
||||||
# No new file should have been created with the destination name
|
|
||||||
new_file_check = await File.get_or_none(name=test_file.name, parent=dest_folder, is_deleted=False)
|
|
||||||
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
|
|
||||||
|
|
||||||
finally:
|
|
||||||
await dest_folder.delete()
|
|
||||||
await existing_dest_file.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_proppatch_remove(test_user, test_file):
|
|
||||||
# First, set a property
|
|
||||||
prop = await WebDAVProperty.create(
|
|
||||||
resource_type="file", resource_id=test_file.id,
|
|
||||||
namespace="http://example.com", name="author", value="Test Author"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now, remove it
|
|
||||||
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<D:propertyupdate xmlns:D="DAV:">
|
|
||||||
<D:remove>
|
|
||||||
<D:prop>
|
|
||||||
<custom:author xmlns:custom="http://example.com"/>
|
|
||||||
</D:prop>
|
|
||||||
</D:remove>
|
|
||||||
</D:propertyupdate>"""
|
|
||||||
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPPATCH",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
content=proppatch_xml,
|
|
||||||
headers={
|
|
||||||
"Content-Type": "application/xml",
|
|
||||||
**get_basic_auth_header("testuser", "testpass"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
|
||||||
assert "200 OK" in response.text
|
|
||||||
|
|
||||||
# Check if property was removed
|
|
||||||
removed_prop = await WebDAVProperty.get_or_none(id=prop.id)
|
|
||||||
assert removed_prop is None
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_propfind_not_found(test_user):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"PROPFIND",
|
|
||||||
"/webdav/nonexistentfolder/",
|
|
||||||
headers={
|
|
||||||
"Depth": "1",
|
|
||||||
**get_basic_auth_header("testuser", "testpass")
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_mkcol_nested_fail(test_user):
|
|
||||||
"""Test creating a nested directory where the parent does not exist."""
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MKCOL",
|
|
||||||
"/webdav/parent/newfolder/",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
# Expect 409 Conflict because parent collection does not exist
|
|
||||||
assert response.status_code == status.HTTP_409_CONFLICT
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_mkcol_already_exists(test_user, test_folder):
|
|
||||||
"""Test creating a directory that already exists."""
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MKCOL",
|
|
||||||
f"/webdav/{test_folder.name}/",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
# Expect 405 Method Not Allowed if collection already exists
|
|
||||||
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_delete_non_existent_file(test_user):
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.delete(
|
|
||||||
"/webdav/nonexistent.txt",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_delete_non_empty_folder(test_user, test_file):
|
|
||||||
"""A non-empty folder cannot be deleted."""
|
|
||||||
folder_to_delete = test_file.parent
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.delete(
|
|
||||||
f"/webdav/{folder_to_delete.name}/",
|
|
||||||
headers=get_basic_auth_header("testuser", "testpass"),
|
|
||||||
)
|
|
||||||
# Expect 409 Conflict as the folder is not empty
|
|
||||||
assert response.status_code == status.HTTP_409_CONFLICT
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_copy_file_overwrite_false_fail(test_user, test_file):
|
|
||||||
# Create a destination file that already exists
|
|
||||||
dest_file = await File.create(
|
|
||||||
name=f"copied_{test_file.name}",
|
|
||||||
path=f"copied_{test_file.name}",
|
|
||||||
size=1,
|
|
||||||
mime_type="text/plain",
|
|
||||||
file_hash="dummyhash2",
|
|
||||||
owner=test_user,
|
|
||||||
parent=test_file.parent,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"COPY",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{test_file.parent.name}/{dest_file.name}",
|
|
||||||
"Overwrite": "F",
|
|
||||||
**get_basic_auth_header("testuser", "testpass"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
# 412 Precondition Failed because Overwrite is 'F' and destination exists
|
|
||||||
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
|
|
||||||
finally:
|
|
||||||
await dest_file.delete()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_webdav_move_file_overwrite_false_fail(test_user, test_file):
|
|
||||||
dest_folder = await Folder.create(name="destfolder", owner=test_user)
|
|
||||||
# Create a file with the same name at the destination
|
|
||||||
existing_dest_file = await File.create(
|
|
||||||
name=test_file.name,
|
|
||||||
path=f"{dest_folder.name}/{test_file.name}",
|
|
||||||
size=1,
|
|
||||||
mime_type="text/plain",
|
|
||||||
file_hash="dummyhash3",
|
|
||||||
owner=test_user,
|
|
||||||
parent=dest_folder,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with AsyncClient(
|
|
||||||
transport=ASGITransport(app=app), base_url="http://test"
|
|
||||||
) as client:
|
|
||||||
response = await client.request(
|
|
||||||
"MOVE",
|
|
||||||
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
|
||||||
headers={
|
|
||||||
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
|
||||||
"Overwrite": "F",
|
|
||||||
**get_basic_auth_header("testuser", "testpass"),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
# 412 Precondition Failed because Overwrite is 'F' and destination exists
|
|
||||||
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
|
|
||||||
finally:
|
|
||||||
await dest_folder.delete()
|
|
||||||
await existing_dest_file.delete()
|
|
||||||
Loading…
Reference in New Issue
Block a user