import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from fastapi import status
from mywebdav.main import app
from mywebdav.models import User, Folder, File, WebDAVProperty
from mywebdav.auth import get_password_hash
import base64
from datetime import datetime
@pytest_asyncio.fixture
async def test_user():
user = await User.create(
username="testuser",
email="test@example.com",
hashed_password=get_password_hash("testpass"),
is_active=True,
is_superuser=False,
)
yield user
await user.delete()
@pytest_asyncio.fixture
async def test_folder(test_user):
folder = await Folder.create(
name="testfolder",
owner=test_user,
)
yield folder
await folder.delete()
@pytest_asyncio.fixture
async def test_file(test_user, test_folder):
file = await File.create(
name="testfile.txt",
path="testfile.txt",
size=13,
mime_type="text/plain",
file_hash="dummyhash",
owner=test_user,
parent=test_folder,
)
yield file
await file.delete()
def get_basic_auth_header(username, password):
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
return {"Authorization": f"Basic {credentials}"}
@pytest.mark.asyncio
async def test_webdav_options():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.options("/webdav/")
assert response.status_code == status.HTTP_200_OK
assert "DAV" in response.headers
assert "Allow" in response.headers
@pytest.mark.asyncio
async def test_webdav_propfind_root_unauthorized():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
"/webdav/",
headers={"Depth": "1"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert "WWW-Authenticate" in response.headers
assert 'Basic realm="MyWebdav WebDAV"' in response.headers["WWW-Authenticate"]
@pytest.mark.asyncio
async def test_webdav_propfind_root(test_user):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
"/webdav/",
headers={
"Depth": "1",
**get_basic_auth_header("testuser", "testpass")
},
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
# Parse XML response
content = response.text
assert "<D:multistatus" in content
assert "<D:response>" in content
@pytest.mark.asyncio
async def test_webdav_propfind_folder(test_user, test_folder):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
f"/webdav/{test_folder.name}/",
headers={
"Depth": "1",
**get_basic_auth_header("testuser", "testpass")
},
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
content = response.text
assert test_folder.name in content
@pytest.mark.asyncio
async def test_webdav_propfind_file(test_user, test_file):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Depth": "0",
**get_basic_auth_header("testuser", "testpass")
},
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
content = response.text
assert test_file.name in content
assert "text/plain" in content
@pytest.mark.asyncio
async def test_webdav_get_file(test_user, test_file):
# Mock storage manager to return file content
from mywebdav import storage
original_get_file = storage.storage_manager.get_file
async def mock_get_file(user_id, path):
if path == test_file.path:
yield b"Hello, World!"
else:
raise FileNotFoundError()
storage.storage_manager.get_file = mock_get_file
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get(
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers=get_basic_auth_header("testuser", "testpass")
)
assert response.status_code == status.HTTP_200_OK
assert response.content == b"Hello, World!"
finally:
storage.storage_manager.get_file = original_get_file
@pytest.mark.asyncio
async def test_webdav_put_file(test_user, test_folder):
from mywebdav import storage
original_save_file = storage.storage_manager.save_file
saved_content = None
saved_path = None
async def mock_save_file(user_id, path, content):
nonlocal saved_content, saved_path
saved_content = content
saved_path = path
storage.storage_manager.save_file = mock_save_file
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.put(
f"/webdav/{test_folder.name}/newfile.txt",
content=b"New file content",
headers=get_basic_auth_header("testuser", "testpass")
)
assert response.status_code == status.HTTP_201_CREATED
assert saved_content == b"New file content"
# Check if file was created in DB
file = await File.get_or_none(
name="newfile.txt", parent=test_folder, owner=test_user, is_deleted=False
)
assert file is not None
assert file.path == saved_path
await file.delete()
finally:
storage.storage_manager.save_file = original_save_file
@pytest.mark.asyncio
async def test_webdav_mkcol(test_user):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MKCOL",
"/webdav/newfolder/",
headers=get_basic_auth_header("testuser", "testpass")
)
assert response.status_code == status.HTTP_201_CREATED
# Check if folder was created
folder = await Folder.get_or_none(
name="newfolder", parent=None, owner=test_user, is_deleted=False
)
assert folder is not None
await folder.delete()
@pytest.mark.asyncio
async def test_webdav_delete_file(test_user, test_file):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers=get_basic_auth_header("testuser", "testpass")
)
assert response.status_code == status.HTTP_204_NO_CONTENT
# Check if file was marked as deleted
updated_file = await File.get(id=test_file.id)
assert updated_file.is_deleted == True
@pytest.mark.asyncio
async def test_webdav_delete_folder(test_user, test_folder):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
f"/webdav/{test_folder.name}/",
headers=get_basic_auth_header("testuser", "testpass")
)
assert response.status_code == status.HTTP_204_NO_CONTENT
# Check if folder was marked as deleted
updated_folder = await Folder.get(id=test_folder.id)
assert updated_folder.is_deleted == True
@pytest.mark.asyncio
async def test_webdav_copy_file(test_user, test_file):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"COPY",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{test_file.parent.name}/copied_{test_file.name}",
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_201_CREATED
# Check if copy was created
copied_file = await File.get_or_none(
name=f"copied_{test_file.name}", parent=test_file.parent, owner=test_user, is_deleted=False
)
assert copied_file is not None
await copied_file.delete()
@pytest.mark.asyncio
async def test_webdav_move_file(test_user, test_file, test_folder):
# Create another folder
dest_folder = await Folder.create(
name="destfolder",
owner=test_user,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MOVE",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_201_CREATED
# Check if file was moved
moved_file = await File.get_or_none(
name=test_file.name, parent=dest_folder, owner=test_user, is_deleted=False
)
assert moved_file is not None
# Original should be gone
original_file = await File.get_or_none(
id=test_file.id, is_deleted=False
)
assert original_file is None
finally:
await dest_folder.delete()
@pytest.mark.asyncio
async def test_webdav_lock_unlock(test_user, test_file):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
# Lock
lock_xml = """<?xml version="1.0" encoding="utf-8"?>
<D:lockinfo xmlns:D="DAV:">
<D:lockscope><D:exclusive/></D:lockscope>
<D:locktype><D:write/></D:locktype>
</D:lockinfo>"""
response = await client.request(
"LOCK",
f"/webdav/{test_file.parent.name}/{test_file.name}",
content=lock_xml,
headers={
"Content-Type": "application/xml",
"Timeout": "Second-3600",
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_200_OK
lock_token = response.headers.get("Lock-Token")
assert lock_token is not None
# Unlock
response = await client.request(
"UNLOCK",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Lock-Token": lock_token,
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_204_NO_CONTENT
@pytest.mark.asyncio
async def test_webdav_propfind_allprop(test_user, test_file):
propfind_xml = """<?xml version="1.0" encoding="utf-8"?>
<D:propfind xmlns:D="DAV:">
<D:allprop/>
</D:propfind>"""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
f"/webdav/{test_file.parent.name}/{test_file.name}",
content=propfind_xml,
headers={
"Content-Type": "application/xml",
"Depth": "0",
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
content = response.text
assert "getcontentlength" in content
assert "getcontenttype" in content
@pytest.mark.asyncio
async def test_webdav_proppatch(test_user, test_file):
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
<D:propertyupdate xmlns:D="DAV:">
<D:set>
<D:prop>
<custom:author xmlns:custom="http://example.com">Test Author</custom:author>
</D:prop>
</D:set>
</D:propertyupdate>"""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPPATCH",
f"/webdav/{test_file.parent.name}/{test_file.name}",
content=proppatch_xml,
headers={
"Content-Type": "application/xml",
**get_basic_auth_header("testuser", "testpass")
}
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
# Check if property was set
from mywebdav.models import WebDAVProperty
prop = await WebDAVProperty.get_or_none(
resource_type="file",
resource_id=test_file.id,
namespace="http://example.com",
name="author"
)
await prop.delete()
@pytest.mark.asyncio
async def test_webdav_head_file(test_user, test_file):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.head(
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers=get_basic_auth_header("testuser", "testpass"),
)
assert response.status_code == status.HTTP_200_OK
assert response.headers["Content-Length"] == str(test_file.size)
assert response.headers["Content-Type"] == test_file.mime_type
assert response.content == b""
@pytest.mark.asyncio
async def test_webdav_put_update_file(test_user, test_file):
from mywebdav import storage
original_save_file = storage.storage_manager.save_file
saved_content = None
async def mock_save_file(user_id, path, content):
nonlocal saved_content
saved_content = content
storage.storage_manager.save_file = mock_save_file
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.put(
f"/webdav/{test_file.parent.name}/{test_file.name}",
content=b"Updated content",
headers=get_basic_auth_header("testuser", "testpass"),
)
assert response.status_code == status.HTTP_204_NO_CONTENT
assert saved_content == b"Updated content"
updated_file = await File.get(id=test_file.id)
assert updated_file.size == len(b"Updated content")
finally:
storage.storage_manager.save_file = original_save_file
@pytest.mark.asyncio
async def test_webdav_copy_file_overwrite_true(test_user, test_file):
dest_file = await File.create(
name="destination.txt", parent=test_file.parent, owner=test_user,
path="dest.txt", size=1, mime_type="text/plain", file_hash="oldhash"
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"COPY",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{test_file.parent.name}/destination.txt",
"Overwrite": "T",
**get_basic_auth_header("testuser", "testpass"),
},
)
assert response.status_code == status.HTTP_204_NO_CONTENT
# The original destination file should have been updated
updated_dest_file = await File.get(id=dest_file.id)
assert updated_dest_file.is_deleted == False
assert updated_dest_file.file_hash == test_file.file_hash
assert updated_dest_file.size == test_file.size
assert updated_dest_file.name == dest_file.name # Name should remain the same
assert updated_dest_file.parent_id == test_file.parent_id # Parent should remain the same
# No new file should have been created with the destination name
new_file_check = await File.get_or_none(name="destination.txt", parent=test_file.parent, is_deleted=False)
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
finally:
await dest_file.delete()
@pytest.mark.asyncio
async def test_webdav_move_file_overwrite_true(test_user, test_file):
dest_folder = await Folder.create(name="destfolder", owner=test_user)
existing_dest_file = await File.create(
name=test_file.name, parent=dest_folder, owner=test_user,
path="existing.txt", size=1, mime_type="text/plain", file_hash="oldhash"
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MOVE",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
"Overwrite": "T",
**get_basic_auth_header("testuser", "testpass"),
},
)
assert response.status_code == status.HTTP_204_NO_CONTENT
# The original source file should be marked as deleted
original_source_file = await File.get_or_none(id=test_file.id, is_deleted=True)
assert original_source_file is not None
# The existing destination file should have been updated
updated_dest_file = await File.get(id=existing_dest_file.id)
assert updated_dest_file.is_deleted == False
assert updated_dest_file.file_hash == test_file.file_hash # Should have source's hash
assert updated_dest_file.size == test_file.size # Should have source's size
assert updated_dest_file.name == existing_dest_file.name # Name should remain the same
assert updated_dest_file.parent_id == dest_folder.id # Parent should remain the same
# No new file should have been created with the destination name
new_file_check = await File.get_or_none(name=test_file.name, parent=dest_folder, is_deleted=False)
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
finally:
await dest_folder.delete()
await existing_dest_file.delete()
@pytest.mark.asyncio
async def test_webdav_proppatch_remove(test_user, test_file):
# First, set a property
prop = await WebDAVProperty.create(
resource_type="file", resource_id=test_file.id,
namespace="http://example.com", name="author", value="Test Author"
)
# Now, remove it
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
<D:propertyupdate xmlns:D="DAV:">
<D:remove>
<D:prop>
<custom:author xmlns:custom="http://example.com"/>
</D:prop>
</D:remove>
</D:propertyupdate>"""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPPATCH",
f"/webdav/{test_file.parent.name}/{test_file.name}",
content=proppatch_xml,
headers={
"Content-Type": "application/xml",
**get_basic_auth_header("testuser", "testpass"),
},
)
assert response.status_code == status.HTTP_207_MULTI_STATUS
assert "200 OK" in response.text
# Check if property was removed
removed_prop = await WebDAVProperty.get_or_none(id=prop.id)
assert removed_prop is None
@pytest.mark.asyncio
async def test_webdav_propfind_not_found(test_user):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"PROPFIND",
"/webdav/nonexistentfolder/",
headers={
"Depth": "1",
**get_basic_auth_header("testuser", "testpass")
},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_webdav_mkcol_nested_fail(test_user):
"""Test creating a nested directory where the parent does not exist."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MKCOL",
"/webdav/parent/newfolder/",
headers=get_basic_auth_header("testuser", "testpass"),
)
# Expect 409 Conflict because parent collection does not exist
assert response.status_code == status.HTTP_409_CONFLICT
@pytest.mark.asyncio
async def test_webdav_mkcol_already_exists(test_user, test_folder):
"""Test creating a directory that already exists."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MKCOL",
f"/webdav/{test_folder.name}/",
headers=get_basic_auth_header("testuser", "testpass"),
)
# Expect 405 Method Not Allowed if collection already exists
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
@pytest.mark.asyncio
async def test_webdav_delete_non_existent_file(test_user):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
"/webdav/nonexistent.txt",
headers=get_basic_auth_header("testuser", "testpass"),
)
assert response.status_code == status.HTTP_404_NOT_FOUND
@pytest.mark.asyncio
async def test_webdav_delete_non_empty_folder(test_user, test_file):
"""A non-empty folder cannot be deleted."""
folder_to_delete = test_file.parent
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
f"/webdav/{folder_to_delete.name}/",
headers=get_basic_auth_header("testuser", "testpass"),
)
# Expect 409 Conflict as the folder is not empty
assert response.status_code == status.HTTP_409_CONFLICT
@pytest.mark.asyncio
async def test_webdav_copy_file_overwrite_false_fail(test_user, test_file):
# Create a destination file that already exists
dest_file = await File.create(
name=f"copied_{test_file.name}",
path=f"copied_{test_file.name}",
size=1,
mime_type="text/plain",
file_hash="dummyhash2",
owner=test_user,
parent=test_file.parent,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"COPY",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{test_file.parent.name}/{dest_file.name}",
"Overwrite": "F",
**get_basic_auth_header("testuser", "testpass"),
},
)
# 412 Precondition Failed because Overwrite is 'F' and destination exists
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
finally:
await dest_file.delete()
@pytest.mark.asyncio
async def test_webdav_move_file_overwrite_false_fail(test_user, test_file):
dest_folder = await Folder.create(name="destfolder", owner=test_user)
# Create a file with the same name at the destination
existing_dest_file = await File.create(
name=test_file.name,
path=f"{dest_folder.name}/{test_file.name}",
size=1,
mime_type="text/plain",
file_hash="dummyhash3",
owner=test_user,
parent=dest_folder,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.request(
"MOVE",
f"/webdav/{test_file.parent.name}/{test_file.name}",
headers={
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
"Overwrite": "F",
**get_basic_auth_header("testuser", "testpass"),
},
)
# 412 Precondition Failed because Overwrite is 'F' and destination exists
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
finally:
await dest_folder.delete()
await existing_dest_file.delete()