diff --git a/mywebdav/webdav.py b/mywebdav/webdav.py index f9ef5a9..408fa36 100644 --- a/mywebdav/webdav.py +++ b/mywebdav/webdav.py @@ -13,6 +13,8 @@ from .auth import get_current_user, verify_password from .models import User, File, Folder, WebDAVProperty from .storage import storage_manager from .activity import log_activity +from .settings import settings +from jose import JWTError, jwt router = APIRouter( prefix="/webdav", @@ -66,16 +68,28 @@ async def basic_auth(authorization: Optional[str] = Header(None)): async def webdav_auth(request: Request, authorization: Optional[str] = Header(None)): + # First, try Basic Auth, which is common for WebDAV clients user = await basic_auth(authorization) if user: return user - try: - user = await get_current_user(request) - return user - except HTTPException: - pass + # If Basic Auth fails or is not provided, try to authenticate using the session cookie + token = request.cookies.get("access_token") + if token: + try: + payload = jwt.decode( + token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] + ) + username: str = payload.get("sub") + if username: + user = await User.get_or_none(username=username) + if user: + return user + except JWTError: + # Token is invalid, fall through to the final exception + pass + # If all authentication methods fail, raise 401 raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, headers={"WWW-Authenticate": 'Basic realm="MyWebdav WebDAV"'}, @@ -83,25 +97,34 @@ async def webdav_auth(request: Request, authorization: Optional[str] = Header(No async def resolve_path(path_str: str, user: User): - if not path_str or path_str == "/": + """ + Resolves a path string to a resource. + Returns a tuple: (resource, parent_folder, exists) + - resource: The File or Folder object at the path, or None if not found. + - parent_folder: The parent Folder object, or None if root. + - exists: Boolean indicating if the resource at the given path exists. + """ + path_str = path_str.strip("/") + if not path_str: # Root directory + # The root exists conceptually, but has no specific resource object. + # It contains top-level files and folders. return None, None, True parts = [p for p in path_str.split("/") if p] - - if not parts: - return None, None, True - current_folder = None + # Traverse the path to find the parent of the target resource for i, part in enumerate(parts[:-1]): folder = await Folder.get_or_none( name=part, parent=current_folder, owner=user, is_deleted=False ) if not folder: + # A component in the middle of the path does not exist, so the full path cannot exist. return None, None, False current_folder = folder last_part = parts[-1] + # Check for the target resource itself (can be a folder or a file) folder = await Folder.get_or_none( name=last_part, parent=current_folder, owner=user, is_deleted=False ) @@ -114,7 +137,8 @@ async def resolve_path(path_str: str, user: User): if file: return file, current_folder, True - return None, current_folder, True + # The resource itself was not found, but the path to its parent is valid. + return None, current_folder, False def build_href(base_path: str, name: str, is_collection: bool): @@ -220,195 +244,75 @@ async def handle_propfind( request: Request, full_path: str, current_user: User = Depends(webdav_auth) ): depth = request.headers.get("Depth", "1") - full_path = unquote(full_path).strip("/") + full_path_str = unquote(full_path).strip("/") body = await request.body() requested_props = parse_propfind_body(body) - resource, parent, exists = await resolve_path(full_path, current_user) + resource, parent_folder, exists = await resolve_path(full_path_str, current_user) - if not exists and resource is None: + if not exists: raise HTTPException(status_code=404, detail="Not found") multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"}) + base_href = f"/webdav/{full_path_str}" if full_path_str else "/webdav/" - base_href = f"/webdav/{full_path}" if full_path else "/webdav/" - - if resource is None: + # This function adds a single resource to the multistatus response + async def add_resource_to_response(res, res_href): response = ET.SubElement(multistatus, "D:response") href = ET.SubElement(response, "D:href") - href.text = base_href if base_href.endswith("/") else base_href + "/" + href.text = res_href - props = { - "resourcetype": "collection", - "displayname": full_path.split("/")[-1] if full_path else "Root", - "creationdate": datetime.now(), - "getlastmodified": datetime.now(), - } - response.append(create_propstat_element(props)) + props = {} + custom_props = None + res_type = "" + res_id = None - if depth in ["1", "infinity"]: - folders = await Folder.filter( - owner=current_user, parent=parent, is_deleted=False - ) - files = await File.filter( - owner=current_user, parent=parent, is_deleted=False - ) + if isinstance(res, Folder): + res_type, res_id = "folder", res.id + props = { + "resourcetype": "collection", "displayname": res.name, + "creationdate": res.created_at, "getlastmodified": res.updated_at, + } + elif isinstance(res, File): + res_type, res_id = "file", res.id + props = { + "resourcetype": "", "displayname": res.name, + "getcontentlength": res.size, "getcontenttype": res.mime_type, + "creationdate": res.created_at, "getlastmodified": res.updated_at, + "getetag": f'"{res.file_hash}"', + } + elif res is None and (full_path_str == "" or isinstance(resource, Folder)): # Root or empty folder + props = { + "resourcetype": "collection", "displayname": resource.name if resource else "Root", + "creationdate": resource.created_at if resource else datetime.now(), + "getlastmodified": resource.updated_at if resource else datetime.now(), + } - for folder in folders: - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = build_href(base_href, folder.name, True) + if res_type and (requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props))): + custom_props = await get_custom_properties(res_type, res_id) - props = { - "resourcetype": "collection", - "displayname": folder.name, - "creationdate": folder.created_at, - "getlastmodified": folder.updated_at, - } - custom_props = ( - await get_custom_properties("folder", folder.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) - response.append(create_propstat_element(props, custom_props)) - - for file in files: - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = build_href(base_href, file.name, False) - - props = { - "resourcetype": "", - "displayname": file.name, - "getcontentlength": file.size, - "getcontenttype": file.mime_type, - "creationdate": file.created_at, - "getlastmodified": file.updated_at, - "getetag": f'"{file.file_hash}"', - } - custom_props = ( - await get_custom_properties("file", file.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) - response.append(create_propstat_element(props, custom_props)) - - elif isinstance(resource, Folder): - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = base_href if base_href.endswith("/") else base_href + "/" - - props = { - "resourcetype": "collection", - "displayname": resource.name, - "creationdate": resource.created_at, - "getlastmodified": resource.updated_at, - } - custom_props = ( - await get_custom_properties("folder", resource.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) response.append(create_propstat_element(props, custom_props)) - if depth in ["1", "infinity"]: - folders = await Folder.filter( - owner=current_user, parent=resource, is_deleted=False - ) - files = await File.filter( - owner=current_user, parent=resource, is_deleted=False - ) + # Add the main resource itself to the response + res_href = base_href if isinstance(resource, File) else (base_href if base_href.endswith('/') else base_href + '/') + await add_resource_to_response(resource, res_href) - for folder in folders: - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = build_href(base_href, folder.name, True) + # If depth is 1 or infinity, add children + if depth in ["1", "infinity"]: + target_folder = resource if isinstance(resource, Folder) else (None if full_path_str == "" else parent_folder) + + folders = await Folder.filter(owner=current_user, parent=target_folder, is_deleted=False) + files = await File.filter(owner=current_user, parent=target_folder, is_deleted=False) - props = { - "resourcetype": "collection", - "displayname": folder.name, - "creationdate": folder.created_at, - "getlastmodified": folder.updated_at, - } - custom_props = ( - await get_custom_properties("folder", folder.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) - response.append(create_propstat_element(props, custom_props)) - - for file in files: - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = build_href(base_href, file.name, False) - - props = { - "resourcetype": "", - "displayname": file.name, - "getcontentlength": file.size, - "getcontenttype": file.mime_type, - "creationdate": file.created_at, - "getlastmodified": file.updated_at, - "getetag": f'"{file.file_hash}"', - } - custom_props = ( - await get_custom_properties("file", file.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) - response.append(create_propstat_element(props, custom_props)) - - elif isinstance(resource, File): - response = ET.SubElement(multistatus, "D:response") - href = ET.SubElement(response, "D:href") - href.text = base_href - - props = { - "resourcetype": "", - "displayname": resource.name, - "getcontentlength": resource.size, - "getcontenttype": resource.mime_type, - "creationdate": resource.created_at, - "getlastmodified": resource.updated_at, - "getetag": f'"{resource.file_hash}"', - } - custom_props = ( - await get_custom_properties("file", resource.id) - if requested_props == "allprop" - or ( - isinstance(requested_props, list) - and any(ns != "DAV:" for ns, _ in requested_props) - ) - else None - ) - response.append(create_propstat_element(props, custom_props)) + for folder in folders: + child_href = build_href(base_href, folder.name, True) + await add_resource_to_response(folder, child_href) + for file in files: + child_href = build_href(base_href, file.name, False) + await add_resource_to_response(file, child_href) xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True) - return Response( - content=xml_content, - media_type="application/xml; charset=utf-8", - status_code=207, - ) + return Response(content=xml_content, media_type="application/xml; charset=utf-8", status_code=207) @router.api_route("/{full_path:path}", methods=["GET", "HEAD"]) @@ -416,10 +320,9 @@ async def handle_get( request: Request, full_path: str, current_user: User = Depends(webdav_auth) ): full_path = unquote(full_path).strip("/") + resource, _, exists = await resolve_path(full_path, current_user) - resource, parent, exists = await resolve_path(full_path, current_user) - - if not isinstance(resource, File): + if not exists or not isinstance(resource, File): raise HTTPException(status_code=404, detail="File not found") try: @@ -430,9 +333,7 @@ async def handle_get( "Content-Length": str(resource.size), "Content-Type": resource.mime_type, "ETag": f'"{resource.file_hash}"', - "Last-Modified": resource.updated_at.strftime( - "%a, %d %b %Y %H:%M:%S GMT" - ), + "Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"), }, ) @@ -446,9 +347,7 @@ async def handle_get( headers={ "Content-Disposition": f'attachment; filename="{resource.name}"', "ETag": f'"{resource.file_hash}"', - "Last-Modified": resource.updated_at.strftime( - "%a, %d %b %Y %H:%M:%S GMT" - ), + "Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"), }, ) except FileNotFoundError: @@ -460,18 +359,19 @@ async def handle_put( request: Request, full_path: str, current_user: User = Depends(webdav_auth) ): full_path = unquote(full_path).strip("/") - if not full_path: raise HTTPException(status_code=400, detail="Cannot PUT to root") parts = [p for p in full_path.split("/") if p] file_name = parts[-1] + parent_path = "/".join(parts[:-1]) - parent_path = "/".join(parts[:-1]) if len(parts) > 1 else "" - _, parent_folder, exists = await resolve_path(parent_path, current_user) + parent_resource, _, parent_exists = await resolve_path(parent_path, current_user) - if not exists: - raise HTTPException(status_code=409, detail="Parent folder does not exist") + if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)): + raise HTTPException(status_code=409, detail="Parent collection does not exist") + + parent_folder = parent_resource if isinstance(parent_resource, Folder) else None file_content = await request.body() file_size = len(file_content) @@ -487,8 +387,7 @@ async def handle_put( await storage_manager.save_file(current_user.id, storage_path, file_content) mime_type, _ = mimetypes.guess_type(file_name) - if not mime_type: - mime_type = "application/octet-stream" + mime_type = mime_type or "application/octet-stream" existing_file = await File.get_or_none( name=file_name, parent=parent_folder, owner=current_user, is_deleted=False @@ -496,34 +395,21 @@ async def handle_put( if existing_file: old_size = existing_file.size - existing_file.path = storage_path - existing_file.size = file_size - existing_file.mime_type = mime_type - existing_file.file_hash = file_hash + existing_file.path, existing_file.size, existing_file.mime_type, existing_file.file_hash = \ + storage_path, file_size, mime_type, file_hash existing_file.updated_at = datetime.now() await existing_file.save() - - current_user.used_storage_bytes = ( - current_user.used_storage_bytes - old_size + file_size - ) + current_user.used_storage_bytes += (file_size - old_size) await current_user.save() - await log_activity(current_user, "file_updated", "file", existing_file.id) return Response(status_code=204) else: db_file = await File.create( - name=file_name, - path=storage_path, - size=file_size, - mime_type=mime_type, - file_hash=file_hash, - owner=current_user, - parent=parent_folder, + name=file_name, path=storage_path, size=file_size, mime_type=mime_type, + file_hash=file_hash, owner=current_user, parent=parent_folder ) - current_user.used_storage_bytes += file_size await current_user.save() - await log_activity(current_user, "file_created", "file", db_file.id) return Response(status_code=201) @@ -533,13 +419,12 @@ async def handle_delete( request: Request, full_path: str, current_user: User = Depends(webdav_auth) ): full_path = unquote(full_path).strip("/") - if not full_path: raise HTTPException(status_code=400, detail="Cannot DELETE root") - resource, parent, exists = await resolve_path(full_path, current_user) + resource, _, exists = await resolve_path(full_path, current_user) - if not resource: + if not exists: raise HTTPException(status_code=404, detail="Resource not found") if isinstance(resource, File): @@ -548,7 +433,13 @@ async def handle_delete( await resource.save() await log_activity(current_user, "file_deleted", "file", resource.id) elif isinstance(resource, Folder): + child_files = await File.filter(parent=resource, is_deleted=False).count() + child_folders = await Folder.filter(parent=resource, is_deleted=False).count() + if child_files > 0 or child_folders > 0: + raise HTTPException(status_code=409, detail="Folder is not empty") + resource.is_deleted = True + resource.deleted_at = datetime.now() await resource.save() await log_activity(current_user, "folder_deleted", "folder", resource.id) @@ -560,30 +451,25 @@ async def handle_mkcol( request: Request, full_path: str, current_user: User = Depends(webdav_auth) ): full_path = unquote(full_path).strip("/") - if not full_path: - raise HTTPException(status_code=400, detail="Cannot MKCOL at root") + raise HTTPException(status_code=405, detail="Cannot MKCOL at root") + + resource, _, exists = await resolve_path(full_path, current_user) + if exists: + raise HTTPException(status_code=405, detail="Resource already exists") parts = [p for p in full_path.split("/") if p] folder_name = parts[-1] + parent_path = "/".join(parts[:-1]) - parent_path = "/".join(parts[:-1]) if len(parts) > 1 else "" - _, parent_folder, exists = await resolve_path(parent_path, current_user) + parent_resource, _, parent_exists = await resolve_path(parent_path, current_user) - if not exists: - raise HTTPException(status_code=409, detail="Parent folder does not exist") - - existing = await Folder.get_or_none( - name=folder_name, parent=parent_folder, owner=current_user, is_deleted=False - ) - - if existing: - raise HTTPException(status_code=405, detail="Folder already exists") - - folder = await Folder.create( - name=folder_name, parent=parent_folder, owner=current_user - ) + if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)): + raise HTTPException(status_code=409, detail="Parent collection does not exist") + parent_folder = parent_resource if isinstance(parent_resource, Folder) else None + + folder = await Folder.create(name=folder_name, parent=parent_folder, owner=current_user) await log_activity(current_user, "folder_created", "folder", folder.id) return Response(status_code=201) @@ -594,55 +480,52 @@ async def handle_copy( ): full_path = unquote(full_path).strip("/") destination = request.headers.get("Destination") - overwrite = request.headers.get("Overwrite", "T") + overwrite = request.headers.get("Overwrite", "T").upper() if not destination: raise HTTPException(status_code=400, detail="Destination header required") - dest_path = unquote(urlparse(destination).path) - dest_path = dest_path.replace("/webdav/", "").strip("/") + dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/") + source_resource, _, source_exists = await resolve_path(full_path, current_user) - source_resource, _, exists = await resolve_path(full_path, current_user) - - if not source_resource: + if not source_exists: raise HTTPException(status_code=404, detail="Source not found") - if not isinstance(source_resource, File): raise HTTPException(status_code=501, detail="Only file copy is implemented") - dest_parts = [p for p in dest_path.split("/") if p] - dest_name = dest_parts[-1] - dest_parent_path = "/".join(dest_parts[:-1]) if len(dest_parts) > 1 else "" + dest_name = dest_path.split("/")[-1] + dest_parent_path = "/".join(dest_path.split("/")[:-1]) + + dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user) - _, dest_parent, exists = await resolve_path(dest_parent_path, current_user) + if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)): + raise HTTPException(status_code=409, detail="Destination parent collection does not exist") + + dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None - if not exists: - raise HTTPException(status_code=409, detail="Destination parent does not exist") + existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user) - existing_dest = await File.get_or_none( - name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False - ) + if existing_dest_exists and overwrite == "F": + raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'") - if existing_dest and overwrite == "F": - raise HTTPException( - status_code=412, detail="Destination exists and overwrite is false" + if existing_dest_exists: + # Update existing_dest instead of deleting and recreating + existing_dest.path = source_resource.path + existing_dest.size = source_resource.size + existing_dest.mime_type = source_resource.mime_type + existing_dest.file_hash = source_resource.file_hash + existing_dest.updated_at = datetime.now() + await existing_dest.save() + await log_activity(current_user, "file_copied", "file", existing_dest.id) + return Response(status_code=204) + else: + new_file = await File.create( + name=dest_name, path=source_resource.path, size=source_resource.size, + mime_type=source_resource.mime_type, file_hash=source_resource.file_hash, + owner=current_user, parent=dest_parent_folder ) - - if existing_dest: - await existing_dest.delete() - - new_file = await File.create( - name=dest_name, - path=source_resource.path, - size=source_resource.size, - mime_type=source_resource.mime_type, - file_hash=source_resource.file_hash, - owner=current_user, - parent=dest_parent, - ) - - await log_activity(current_user, "file_copied", "file", new_file.id) - return Response(status_code=201 if not existing_dest else 204) + await log_activity(current_user, "file_copied", "file", new_file.id) + return Response(status_code=201) @router.api_route("/{full_path:path}", methods=["MOVE"]) @@ -651,68 +534,75 @@ async def handle_move( ): full_path = unquote(full_path).strip("/") destination = request.headers.get("Destination") - overwrite = request.headers.get("Overwrite", "T") + overwrite = request.headers.get("Overwrite", "T").upper() if not destination: raise HTTPException(status_code=400, detail="Destination header required") - dest_path = unquote(urlparse(destination).path) - dest_path = dest_path.replace("/webdav/", "").strip("/") + dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/") + source_resource, _, source_exists = await resolve_path(full_path, current_user) - source_resource, _, exists = await resolve_path(full_path, current_user) - - if not source_resource: + if not source_exists: raise HTTPException(status_code=404, detail="Source not found") - dest_parts = [p for p in dest_path.split("/") if p] - dest_name = dest_parts[-1] - dest_parent_path = "/".join(dest_parts[:-1]) if len(dest_parts) > 1 else "" + dest_name = dest_path.split("/")[-1] + dest_parent_path = "/".join(dest_path.split("/")[:-1]) + + dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user) - _, dest_parent, exists = await resolve_path(dest_parent_path, current_user) + if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)): + raise HTTPException(status_code=409, detail="Destination parent collection does not exist") + + dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None - if not exists: - raise HTTPException(status_code=409, detail="Destination parent does not exist") + existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user) - if isinstance(source_resource, File): - existing_dest = await File.get_or_none( - name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False - ) + if existing_dest_exists and overwrite == "F": + raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'") - if existing_dest and overwrite == "F": - raise HTTPException( - status_code=412, detail="Destination exists and overwrite is false" - ) - - if existing_dest: - await existing_dest.delete() - - source_resource.name = dest_name - source_resource.parent = dest_parent - await source_resource.save() - - await log_activity(current_user, "file_moved", "file", source_resource.id) - return Response(status_code=201 if not existing_dest else 204) - - elif isinstance(source_resource, Folder): - existing_dest = await Folder.get_or_none( - name=dest_name, parent=dest_parent, owner=current_user, is_deleted=False - ) - - if existing_dest and overwrite == "F": - raise HTTPException( - status_code=412, detail="Destination exists and overwrite is false" - ) - - if existing_dest: - existing_dest.is_deleted = True + # Handle overwrite scenario + if existing_dest_exists: + if isinstance(source_resource, File): + # Update existing_dest with source_resource's properties + existing_dest.name = dest_name + existing_dest.path = source_resource.path + existing_dest.size = source_resource.size + existing_dest.mime_type = source_resource.mime_type + existing_dest.file_hash = source_resource.file_hash + existing_dest.parent = dest_parent_folder + existing_dest.updated_at = datetime.now() await existing_dest.save() - - source_resource.name = dest_name - source_resource.parent = dest_parent + await log_activity(current_user, "file_moved_overwrite", "file", existing_dest.id) + elif isinstance(source_resource, Folder): + raise HTTPException(status_code=501, detail="Folder move overwrite not implemented") + + # Mark source as deleted + source_resource.is_deleted = True + source_resource.deleted_at = datetime.now() await source_resource.save() + await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id) - await log_activity(current_user, "folder_moved", "folder", source_resource.id) - return Response(status_code=201 if not existing_dest else 204) + return Response(status_code=204) # Overwrite means 204 No Content + + # Handle non-overwrite scenario (create new resource at destination) + else: + if isinstance(source_resource, File): + new_file = await File.create( + name=dest_name, path=source_resource.path, size=source_resource.size, + mime_type=source_resource.mime_type, file_hash=source_resource.file_hash, + owner=current_user, parent=dest_parent_folder + ) + await log_activity(current_user, "file_moved_created", "file", new_file.id) + elif isinstance(source_resource, Folder): + raise HTTPException(status_code=501, detail="Folder move not implemented") + + # Mark source as deleted + source_resource.is_deleted = True + source_resource.deleted_at = datetime.now() + await source_resource.save() + await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id) + + return Response(status_code=201) # New resource created means 201 Created @router.api_route("/{full_path:path}", methods=["LOCK"]) diff --git a/tests/test_webdav.py b/tests/test_webdav.py new file mode 100644 index 0000000..2e0fab1 --- /dev/null +++ b/tests/test_webdav.py @@ -0,0 +1,735 @@ +import pytest +import pytest_asyncio +from httpx import AsyncClient, ASGITransport +from fastapi import status +from mywebdav.main import app +from mywebdav.models import User, Folder, File, WebDAVProperty +from mywebdav.auth import get_password_hash +import base64 +from datetime import datetime + + +@pytest_asyncio.fixture +async def test_user(): + user = await User.create( + username="testuser", + email="test@example.com", + hashed_password=get_password_hash("testpass"), + is_active=True, + is_superuser=False, + ) + yield user + await user.delete() + + +@pytest_asyncio.fixture +async def test_folder(test_user): + folder = await Folder.create( + name="testfolder", + owner=test_user, + ) + yield folder + await folder.delete() + + +@pytest_asyncio.fixture +async def test_file(test_user, test_folder): + file = await File.create( + name="testfile.txt", + path="testfile.txt", + size=13, + mime_type="text/plain", + file_hash="dummyhash", + owner=test_user, + parent=test_folder, + ) + yield file + await file.delete() + + +def get_basic_auth_header(username, password): + credentials = base64.b64encode(f"{username}:{password}".encode()).decode() + return {"Authorization": f"Basic {credentials}"} + + +@pytest.mark.asyncio +async def test_webdav_options(): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.options("/webdav/") + + assert response.status_code == status.HTTP_200_OK + assert "DAV" in response.headers + assert "Allow" in response.headers + + +@pytest.mark.asyncio +async def test_webdav_propfind_root_unauthorized(): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + "/webdav/", + headers={"Depth": "1"}, + ) + + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +@pytest.mark.asyncio +async def test_webdav_propfind_root(test_user): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + "/webdav/", + headers={ + "Depth": "1", + **get_basic_auth_header("testuser", "testpass") + }, + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + # Parse XML response + content = response.text + assert "" in content + + +@pytest.mark.asyncio +async def test_webdav_propfind_folder(test_user, test_folder): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + f"/webdav/{test_folder.name}/", + headers={ + "Depth": "1", + **get_basic_auth_header("testuser", "testpass") + }, + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + content = response.text + assert test_folder.name in content + + +@pytest.mark.asyncio +async def test_webdav_propfind_file(test_user, test_file): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Depth": "0", + **get_basic_auth_header("testuser", "testpass") + }, + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + content = response.text + assert test_file.name in content + assert "text/plain" in content + + +@pytest.mark.asyncio +async def test_webdav_get_file(test_user, test_file): + # Mock storage manager to return file content + from mywebdav import storage + original_get_file = storage.storage_manager.get_file + + async def mock_get_file(user_id, path): + if path == test_file.path: + yield b"Hello, World!" + else: + raise FileNotFoundError() + + storage.storage_manager.get_file = mock_get_file + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.get( + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers=get_basic_auth_header("testuser", "testpass") + ) + + assert response.status_code == status.HTTP_200_OK + assert response.content == b"Hello, World!" + finally: + storage.storage_manager.get_file = original_get_file + + +@pytest.mark.asyncio +async def test_webdav_put_file(test_user, test_folder): + from mywebdav import storage + original_save_file = storage.storage_manager.save_file + + saved_content = None + saved_path = None + + async def mock_save_file(user_id, path, content): + nonlocal saved_content, saved_path + saved_content = content + saved_path = path + + storage.storage_manager.save_file = mock_save_file + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.put( + f"/webdav/{test_folder.name}/newfile.txt", + content=b"New file content", + headers=get_basic_auth_header("testuser", "testpass") + ) + + assert response.status_code == status.HTTP_201_CREATED + assert saved_content == b"New file content" + + # Check if file was created in DB + file = await File.get_or_none( + name="newfile.txt", parent=test_folder, owner=test_user, is_deleted=False + ) + assert file is not None + assert file.path == saved_path + await file.delete() + finally: + storage.storage_manager.save_file = original_save_file + + +@pytest.mark.asyncio +async def test_webdav_mkcol(test_user): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MKCOL", + "/webdav/newfolder/", + headers=get_basic_auth_header("testuser", "testpass") + ) + + assert response.status_code == status.HTTP_201_CREATED + + # Check if folder was created + folder = await Folder.get_or_none( + name="newfolder", parent=None, owner=test_user, is_deleted=False + ) + assert folder is not None + await folder.delete() + + +@pytest.mark.asyncio +async def test_webdav_delete_file(test_user, test_file): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.delete( + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers=get_basic_auth_header("testuser", "testpass") + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # Check if file was marked as deleted + updated_file = await File.get(id=test_file.id) + assert updated_file.is_deleted == True + + +@pytest.mark.asyncio +async def test_webdav_delete_folder(test_user, test_folder): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.delete( + f"/webdav/{test_folder.name}/", + headers=get_basic_auth_header("testuser", "testpass") + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # Check if folder was marked as deleted + updated_folder = await Folder.get(id=test_folder.id) + assert updated_folder.is_deleted == True + + +@pytest.mark.asyncio +async def test_webdav_copy_file(test_user, test_file): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "COPY", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{test_file.parent.name}/copied_{test_file.name}", + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_201_CREATED + + # Check if copy was created + copied_file = await File.get_or_none( + name=f"copied_{test_file.name}", parent=test_file.parent, owner=test_user, is_deleted=False + ) + assert copied_file is not None + await copied_file.delete() + + +@pytest.mark.asyncio +async def test_webdav_move_file(test_user, test_file, test_folder): + # Create another folder + dest_folder = await Folder.create( + name="destfolder", + owner=test_user, + ) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MOVE", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}", + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_201_CREATED + + # Check if file was moved + moved_file = await File.get_or_none( + name=test_file.name, parent=dest_folder, owner=test_user, is_deleted=False + ) + assert moved_file is not None + + # Original should be gone + original_file = await File.get_or_none( + id=test_file.id, is_deleted=False + ) + assert original_file is None + finally: + await dest_folder.delete() + + +@pytest.mark.asyncio +async def test_webdav_lock_unlock(test_user, test_file): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + # Lock + lock_xml = """ + + + + """ + + response = await client.request( + "LOCK", + f"/webdav/{test_file.parent.name}/{test_file.name}", + content=lock_xml, + headers={ + "Content-Type": "application/xml", + "Timeout": "Second-3600", + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_200_OK + lock_token = response.headers.get("Lock-Token") + assert lock_token is not None + + # Unlock + response = await client.request( + "UNLOCK", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Lock-Token": lock_token, + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + +@pytest.mark.asyncio +async def test_webdav_propfind_allprop(test_user, test_file): + propfind_xml = """ + + + """ + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + f"/webdav/{test_file.parent.name}/{test_file.name}", + content=propfind_xml, + headers={ + "Content-Type": "application/xml", + "Depth": "0", + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + content = response.text + assert "getcontentlength" in content + assert "getcontenttype" in content + + +@pytest.mark.asyncio +async def test_webdav_proppatch(test_user, test_file): + proppatch_xml = """ + + + + Test Author + + + """ + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPPATCH", + f"/webdav/{test_file.parent.name}/{test_file.name}", + content=proppatch_xml, + headers={ + "Content-Type": "application/xml", + **get_basic_auth_header("testuser", "testpass") + } + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + + # Check if property was set + from mywebdav.models import WebDAVProperty + prop = await WebDAVProperty.get_or_none( + resource_type="file", + resource_id=test_file.id, + namespace="http://example.com", + name="author" + ) + await prop.delete() + + +@pytest.mark.asyncio +async def test_webdav_head_file(test_user, test_file): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.head( + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers=get_basic_auth_header("testuser", "testpass"), + ) + + assert response.status_code == status.HTTP_200_OK + assert response.headers["Content-Length"] == str(test_file.size) + assert response.headers["Content-Type"] == test_file.mime_type + assert response.content == b"" + + +@pytest.mark.asyncio +async def test_webdav_put_update_file(test_user, test_file): + from mywebdav import storage + original_save_file = storage.storage_manager.save_file + saved_content = None + async def mock_save_file(user_id, path, content): + nonlocal saved_content + saved_content = content + storage.storage_manager.save_file = mock_save_file + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.put( + f"/webdav/{test_file.parent.name}/{test_file.name}", + content=b"Updated content", + headers=get_basic_auth_header("testuser", "testpass"), + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + assert saved_content == b"Updated content" + + updated_file = await File.get(id=test_file.id) + assert updated_file.size == len(b"Updated content") + finally: + storage.storage_manager.save_file = original_save_file + + +@pytest.mark.asyncio +async def test_webdav_copy_file_overwrite_true(test_user, test_file): + dest_file = await File.create( + name="destination.txt", parent=test_file.parent, owner=test_user, + path="dest.txt", size=1, mime_type="text/plain", file_hash="oldhash" + ) + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "COPY", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{test_file.parent.name}/destination.txt", + "Overwrite": "T", + **get_basic_auth_header("testuser", "testpass"), + }, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # The original destination file should have been updated + updated_dest_file = await File.get(id=dest_file.id) + assert updated_dest_file.is_deleted == False + assert updated_dest_file.file_hash == test_file.file_hash + assert updated_dest_file.size == test_file.size + assert updated_dest_file.name == dest_file.name # Name should remain the same + assert updated_dest_file.parent_id == test_file.parent_id # Parent should remain the same + + # No new file should have been created with the destination name + new_file_check = await File.get_or_none(name="destination.txt", parent=test_file.parent, is_deleted=False) + assert new_file_check.id == updated_dest_file.id # Should be the same updated file + + + finally: + await dest_file.delete() + + +@pytest.mark.asyncio +async def test_webdav_move_file_overwrite_true(test_user, test_file): + dest_folder = await Folder.create(name="destfolder", owner=test_user) + existing_dest_file = await File.create( + name=test_file.name, parent=dest_folder, owner=test_user, + path="existing.txt", size=1, mime_type="text/plain", file_hash="oldhash" + ) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MOVE", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}", + "Overwrite": "T", + **get_basic_auth_header("testuser", "testpass"), + }, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # The original source file should be marked as deleted + original_source_file = await File.get_or_none(id=test_file.id, is_deleted=True) + assert original_source_file is not None + + # The existing destination file should have been updated + updated_dest_file = await File.get(id=existing_dest_file.id) + assert updated_dest_file.is_deleted == False + assert updated_dest_file.file_hash == test_file.file_hash # Should have source's hash + assert updated_dest_file.size == test_file.size # Should have source's size + assert updated_dest_file.name == existing_dest_file.name # Name should remain the same + assert updated_dest_file.parent_id == dest_folder.id # Parent should remain the same + + # No new file should have been created with the destination name + new_file_check = await File.get_or_none(name=test_file.name, parent=dest_folder, is_deleted=False) + assert new_file_check.id == updated_dest_file.id # Should be the same updated file + + finally: + await dest_folder.delete() + await existing_dest_file.delete() + + +@pytest.mark.asyncio +async def test_webdav_proppatch_remove(test_user, test_file): + # First, set a property + prop = await WebDAVProperty.create( + resource_type="file", resource_id=test_file.id, + namespace="http://example.com", name="author", value="Test Author" + ) + + # Now, remove it + proppatch_xml = """ + + + + + + + """ + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPPATCH", + f"/webdav/{test_file.parent.name}/{test_file.name}", + content=proppatch_xml, + headers={ + "Content-Type": "application/xml", + **get_basic_auth_header("testuser", "testpass"), + }, + ) + + assert response.status_code == status.HTTP_207_MULTI_STATUS + assert "200 OK" in response.text + + # Check if property was removed + removed_prop = await WebDAVProperty.get_or_none(id=prop.id) + assert removed_prop is None + + + +@pytest.mark.asyncio +async def test_webdav_propfind_not_found(test_user): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "PROPFIND", + "/webdav/nonexistentfolder/", + headers={ + "Depth": "1", + **get_basic_auth_header("testuser", "testpass") + }, + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + +@pytest.mark.asyncio +async def test_webdav_mkcol_nested_fail(test_user): + """Test creating a nested directory where the parent does not exist.""" + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MKCOL", + "/webdav/parent/newfolder/", + headers=get_basic_auth_header("testuser", "testpass"), + ) + # Expect 409 Conflict because parent collection does not exist + assert response.status_code == status.HTTP_409_CONFLICT + + +@pytest.mark.asyncio +async def test_webdav_mkcol_already_exists(test_user, test_folder): + """Test creating a directory that already exists.""" + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MKCOL", + f"/webdav/{test_folder.name}/", + headers=get_basic_auth_header("testuser", "testpass"), + ) + # Expect 405 Method Not Allowed if collection already exists + assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED + + +@pytest.mark.asyncio +async def test_webdav_delete_non_existent_file(test_user): + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.delete( + "/webdav/nonexistent.txt", + headers=get_basic_auth_header("testuser", "testpass"), + ) + assert response.status_code == status.HTTP_404_NOT_FOUND + + +@pytest.mark.asyncio +async def test_webdav_delete_non_empty_folder(test_user, test_file): + """A non-empty folder cannot be deleted.""" + folder_to_delete = test_file.parent + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.delete( + f"/webdav/{folder_to_delete.name}/", + headers=get_basic_auth_header("testuser", "testpass"), + ) + # Expect 409 Conflict as the folder is not empty + assert response.status_code == status.HTTP_409_CONFLICT + + +@pytest.mark.asyncio +async def test_webdav_copy_file_overwrite_false_fail(test_user, test_file): + # Create a destination file that already exists + dest_file = await File.create( + name=f"copied_{test_file.name}", + path=f"copied_{test_file.name}", + size=1, + mime_type="text/plain", + file_hash="dummyhash2", + owner=test_user, + parent=test_file.parent, + ) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "COPY", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{test_file.parent.name}/{dest_file.name}", + "Overwrite": "F", + **get_basic_auth_header("testuser", "testpass"), + }, + ) + # 412 Precondition Failed because Overwrite is 'F' and destination exists + assert response.status_code == status.HTTP_412_PRECONDITION_FAILED + finally: + await dest_file.delete() + + +@pytest.mark.asyncio +async def test_webdav_move_file_overwrite_false_fail(test_user, test_file): + dest_folder = await Folder.create(name="destfolder", owner=test_user) + # Create a file with the same name at the destination + existing_dest_file = await File.create( + name=test_file.name, + path=f"{dest_folder.name}/{test_file.name}", + size=1, + mime_type="text/plain", + file_hash="dummyhash3", + owner=test_user, + parent=dest_folder, + ) + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as client: + response = await client.request( + "MOVE", + f"/webdav/{test_file.parent.name}/{test_file.name}", + headers={ + "Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}", + "Overwrite": "F", + **get_basic_auth_header("testuser", "testpass"), + }, + ) + # 412 Precondition Failed because Overwrite is 'F' and destination exists + assert response.status_code == status.HTTP_412_PRECONDITION_FAILED + finally: + await dest_folder.delete() + await existing_dest_file.delete() \ No newline at end of file