735 lines
24 KiB
Python
735 lines
24 KiB
Python
|
|
import pytest
|
||
|
|
import pytest_asyncio
|
||
|
|
from httpx import AsyncClient, ASGITransport
|
||
|
|
from fastapi import status
|
||
|
|
from mywebdav.main import app
|
||
|
|
from mywebdav.models import User, Folder, File, WebDAVProperty
|
||
|
|
from mywebdav.auth import get_password_hash
|
||
|
|
import base64
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def test_user():
|
||
|
|
user = await User.create(
|
||
|
|
username="testuser",
|
||
|
|
email="test@example.com",
|
||
|
|
hashed_password=get_password_hash("testpass"),
|
||
|
|
is_active=True,
|
||
|
|
is_superuser=False,
|
||
|
|
)
|
||
|
|
yield user
|
||
|
|
await user.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def test_folder(test_user):
|
||
|
|
folder = await Folder.create(
|
||
|
|
name="testfolder",
|
||
|
|
owner=test_user,
|
||
|
|
)
|
||
|
|
yield folder
|
||
|
|
await folder.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest_asyncio.fixture
|
||
|
|
async def test_file(test_user, test_folder):
|
||
|
|
file = await File.create(
|
||
|
|
name="testfile.txt",
|
||
|
|
path="testfile.txt",
|
||
|
|
size=13,
|
||
|
|
mime_type="text/plain",
|
||
|
|
file_hash="dummyhash",
|
||
|
|
owner=test_user,
|
||
|
|
parent=test_folder,
|
||
|
|
)
|
||
|
|
yield file
|
||
|
|
await file.delete()
|
||
|
|
|
||
|
|
|
||
|
|
def get_basic_auth_header(username, password):
|
||
|
|
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
|
||
|
|
return {"Authorization": f"Basic {credentials}"}
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_options():
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.options("/webdav/")
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_200_OK
|
||
|
|
assert "DAV" in response.headers
|
||
|
|
assert "Allow" in response.headers
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_root_unauthorized():
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
"/webdav/",
|
||
|
|
headers={"Depth": "1"},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_root(test_user):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
"/webdav/",
|
||
|
|
headers={
|
||
|
|
"Depth": "1",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
# Parse XML response
|
||
|
|
content = response.text
|
||
|
|
assert "<D:multistatus" in content
|
||
|
|
assert "<D:response>" in content
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_folder(test_user, test_folder):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
f"/webdav/{test_folder.name}/",
|
||
|
|
headers={
|
||
|
|
"Depth": "1",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
content = response.text
|
||
|
|
assert test_folder.name in content
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_file(test_user, test_file):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Depth": "0",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
content = response.text
|
||
|
|
assert test_file.name in content
|
||
|
|
assert "text/plain" in content
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_get_file(test_user, test_file):
|
||
|
|
# Mock storage manager to return file content
|
||
|
|
from mywebdav import storage
|
||
|
|
original_get_file = storage.storage_manager.get_file
|
||
|
|
|
||
|
|
async def mock_get_file(user_id, path):
|
||
|
|
if path == test_file.path:
|
||
|
|
yield b"Hello, World!"
|
||
|
|
else:
|
||
|
|
raise FileNotFoundError()
|
||
|
|
|
||
|
|
storage.storage_manager.get_file = mock_get_file
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.get(
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass")
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_200_OK
|
||
|
|
assert response.content == b"Hello, World!"
|
||
|
|
finally:
|
||
|
|
storage.storage_manager.get_file = original_get_file
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_put_file(test_user, test_folder):
|
||
|
|
from mywebdav import storage
|
||
|
|
original_save_file = storage.storage_manager.save_file
|
||
|
|
|
||
|
|
saved_content = None
|
||
|
|
saved_path = None
|
||
|
|
|
||
|
|
async def mock_save_file(user_id, path, content):
|
||
|
|
nonlocal saved_content, saved_path
|
||
|
|
saved_content = content
|
||
|
|
saved_path = path
|
||
|
|
|
||
|
|
storage.storage_manager.save_file = mock_save_file
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.put(
|
||
|
|
f"/webdav/{test_folder.name}/newfile.txt",
|
||
|
|
content=b"New file content",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass")
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_201_CREATED
|
||
|
|
assert saved_content == b"New file content"
|
||
|
|
|
||
|
|
# Check if file was created in DB
|
||
|
|
file = await File.get_or_none(
|
||
|
|
name="newfile.txt", parent=test_folder, owner=test_user, is_deleted=False
|
||
|
|
)
|
||
|
|
assert file is not None
|
||
|
|
assert file.path == saved_path
|
||
|
|
await file.delete()
|
||
|
|
finally:
|
||
|
|
storage.storage_manager.save_file = original_save_file
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_mkcol(test_user):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MKCOL",
|
||
|
|
"/webdav/newfolder/",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass")
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_201_CREATED
|
||
|
|
|
||
|
|
# Check if folder was created
|
||
|
|
folder = await Folder.get_or_none(
|
||
|
|
name="newfolder", parent=None, owner=test_user, is_deleted=False
|
||
|
|
)
|
||
|
|
assert folder is not None
|
||
|
|
await folder.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_delete_file(test_user, test_file):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.delete(
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass")
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
|
||
|
|
# Check if file was marked as deleted
|
||
|
|
updated_file = await File.get(id=test_file.id)
|
||
|
|
assert updated_file.is_deleted == True
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_delete_folder(test_user, test_folder):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.delete(
|
||
|
|
f"/webdav/{test_folder.name}/",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass")
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
|
||
|
|
# Check if folder was marked as deleted
|
||
|
|
updated_folder = await Folder.get(id=test_folder.id)
|
||
|
|
assert updated_folder.is_deleted == True
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_copy_file(test_user, test_file):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"COPY",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{test_file.parent.name}/copied_{test_file.name}",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_201_CREATED
|
||
|
|
|
||
|
|
# Check if copy was created
|
||
|
|
copied_file = await File.get_or_none(
|
||
|
|
name=f"copied_{test_file.name}", parent=test_file.parent, owner=test_user, is_deleted=False
|
||
|
|
)
|
||
|
|
assert copied_file is not None
|
||
|
|
await copied_file.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_move_file(test_user, test_file, test_folder):
|
||
|
|
# Create another folder
|
||
|
|
dest_folder = await Folder.create(
|
||
|
|
name="destfolder",
|
||
|
|
owner=test_user,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MOVE",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_201_CREATED
|
||
|
|
|
||
|
|
# Check if file was moved
|
||
|
|
moved_file = await File.get_or_none(
|
||
|
|
name=test_file.name, parent=dest_folder, owner=test_user, is_deleted=False
|
||
|
|
)
|
||
|
|
assert moved_file is not None
|
||
|
|
|
||
|
|
# Original should be gone
|
||
|
|
original_file = await File.get_or_none(
|
||
|
|
id=test_file.id, is_deleted=False
|
||
|
|
)
|
||
|
|
assert original_file is None
|
||
|
|
finally:
|
||
|
|
await dest_folder.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_lock_unlock(test_user, test_file):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
# Lock
|
||
|
|
lock_xml = """<?xml version="1.0" encoding="utf-8"?>
|
||
|
|
<D:lockinfo xmlns:D="DAV:">
|
||
|
|
<D:lockscope><D:exclusive/></D:lockscope>
|
||
|
|
<D:locktype><D:write/></D:locktype>
|
||
|
|
</D:lockinfo>"""
|
||
|
|
|
||
|
|
response = await client.request(
|
||
|
|
"LOCK",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
content=lock_xml,
|
||
|
|
headers={
|
||
|
|
"Content-Type": "application/xml",
|
||
|
|
"Timeout": "Second-3600",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_200_OK
|
||
|
|
lock_token = response.headers.get("Lock-Token")
|
||
|
|
assert lock_token is not None
|
||
|
|
|
||
|
|
# Unlock
|
||
|
|
response = await client.request(
|
||
|
|
"UNLOCK",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Lock-Token": lock_token,
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_allprop(test_user, test_file):
|
||
|
|
propfind_xml = """<?xml version="1.0" encoding="utf-8"?>
|
||
|
|
<D:propfind xmlns:D="DAV:">
|
||
|
|
<D:allprop/>
|
||
|
|
</D:propfind>"""
|
||
|
|
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
content=propfind_xml,
|
||
|
|
headers={
|
||
|
|
"Content-Type": "application/xml",
|
||
|
|
"Depth": "0",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
content = response.text
|
||
|
|
assert "getcontentlength" in content
|
||
|
|
assert "getcontenttype" in content
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_proppatch(test_user, test_file):
|
||
|
|
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
|
||
|
|
<D:propertyupdate xmlns:D="DAV:">
|
||
|
|
<D:set>
|
||
|
|
<D:prop>
|
||
|
|
<custom:author xmlns:custom="http://example.com">Test Author</custom:author>
|
||
|
|
</D:prop>
|
||
|
|
</D:set>
|
||
|
|
</D:propertyupdate>"""
|
||
|
|
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPPATCH",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
content=proppatch_xml,
|
||
|
|
headers={
|
||
|
|
"Content-Type": "application/xml",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
|
||
|
|
# Check if property was set
|
||
|
|
from mywebdav.models import WebDAVProperty
|
||
|
|
prop = await WebDAVProperty.get_or_none(
|
||
|
|
resource_type="file",
|
||
|
|
resource_id=test_file.id,
|
||
|
|
namespace="http://example.com",
|
||
|
|
name="author"
|
||
|
|
)
|
||
|
|
await prop.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_head_file(test_user, test_file):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.head(
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_200_OK
|
||
|
|
assert response.headers["Content-Length"] == str(test_file.size)
|
||
|
|
assert response.headers["Content-Type"] == test_file.mime_type
|
||
|
|
assert response.content == b""
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_put_update_file(test_user, test_file):
|
||
|
|
from mywebdav import storage
|
||
|
|
original_save_file = storage.storage_manager.save_file
|
||
|
|
saved_content = None
|
||
|
|
async def mock_save_file(user_id, path, content):
|
||
|
|
nonlocal saved_content
|
||
|
|
saved_content = content
|
||
|
|
storage.storage_manager.save_file = mock_save_file
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.put(
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
content=b"Updated content",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
assert saved_content == b"Updated content"
|
||
|
|
|
||
|
|
updated_file = await File.get(id=test_file.id)
|
||
|
|
assert updated_file.size == len(b"Updated content")
|
||
|
|
finally:
|
||
|
|
storage.storage_manager.save_file = original_save_file
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_copy_file_overwrite_true(test_user, test_file):
|
||
|
|
dest_file = await File.create(
|
||
|
|
name="destination.txt", parent=test_file.parent, owner=test_user,
|
||
|
|
path="dest.txt", size=1, mime_type="text/plain", file_hash="oldhash"
|
||
|
|
)
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"COPY",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{test_file.parent.name}/destination.txt",
|
||
|
|
"Overwrite": "T",
|
||
|
|
**get_basic_auth_header("testuser", "testpass"),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
|
||
|
|
# The original destination file should have been updated
|
||
|
|
updated_dest_file = await File.get(id=dest_file.id)
|
||
|
|
assert updated_dest_file.is_deleted == False
|
||
|
|
assert updated_dest_file.file_hash == test_file.file_hash
|
||
|
|
assert updated_dest_file.size == test_file.size
|
||
|
|
assert updated_dest_file.name == dest_file.name # Name should remain the same
|
||
|
|
assert updated_dest_file.parent_id == test_file.parent_id # Parent should remain the same
|
||
|
|
|
||
|
|
# No new file should have been created with the destination name
|
||
|
|
new_file_check = await File.get_or_none(name="destination.txt", parent=test_file.parent, is_deleted=False)
|
||
|
|
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
|
||
|
|
|
||
|
|
|
||
|
|
finally:
|
||
|
|
await dest_file.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_move_file_overwrite_true(test_user, test_file):
|
||
|
|
dest_folder = await Folder.create(name="destfolder", owner=test_user)
|
||
|
|
existing_dest_file = await File.create(
|
||
|
|
name=test_file.name, parent=dest_folder, owner=test_user,
|
||
|
|
path="existing.txt", size=1, mime_type="text/plain", file_hash="oldhash"
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MOVE",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
||
|
|
"Overwrite": "T",
|
||
|
|
**get_basic_auth_header("testuser", "testpass"),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||
|
|
|
||
|
|
# The original source file should be marked as deleted
|
||
|
|
original_source_file = await File.get_or_none(id=test_file.id, is_deleted=True)
|
||
|
|
assert original_source_file is not None
|
||
|
|
|
||
|
|
# The existing destination file should have been updated
|
||
|
|
updated_dest_file = await File.get(id=existing_dest_file.id)
|
||
|
|
assert updated_dest_file.is_deleted == False
|
||
|
|
assert updated_dest_file.file_hash == test_file.file_hash # Should have source's hash
|
||
|
|
assert updated_dest_file.size == test_file.size # Should have source's size
|
||
|
|
assert updated_dest_file.name == existing_dest_file.name # Name should remain the same
|
||
|
|
assert updated_dest_file.parent_id == dest_folder.id # Parent should remain the same
|
||
|
|
|
||
|
|
# No new file should have been created with the destination name
|
||
|
|
new_file_check = await File.get_or_none(name=test_file.name, parent=dest_folder, is_deleted=False)
|
||
|
|
assert new_file_check.id == updated_dest_file.id # Should be the same updated file
|
||
|
|
|
||
|
|
finally:
|
||
|
|
await dest_folder.delete()
|
||
|
|
await existing_dest_file.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_proppatch_remove(test_user, test_file):
|
||
|
|
# First, set a property
|
||
|
|
prop = await WebDAVProperty.create(
|
||
|
|
resource_type="file", resource_id=test_file.id,
|
||
|
|
namespace="http://example.com", name="author", value="Test Author"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Now, remove it
|
||
|
|
proppatch_xml = """<?xml version="1.0" encoding="utf-8"?>
|
||
|
|
<D:propertyupdate xmlns:D="DAV:">
|
||
|
|
<D:remove>
|
||
|
|
<D:prop>
|
||
|
|
<custom:author xmlns:custom="http://example.com"/>
|
||
|
|
</D:prop>
|
||
|
|
</D:remove>
|
||
|
|
</D:propertyupdate>"""
|
||
|
|
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPPATCH",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
content=proppatch_xml,
|
||
|
|
headers={
|
||
|
|
"Content-Type": "application/xml",
|
||
|
|
**get_basic_auth_header("testuser", "testpass"),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
assert response.status_code == status.HTTP_207_MULTI_STATUS
|
||
|
|
assert "200 OK" in response.text
|
||
|
|
|
||
|
|
# Check if property was removed
|
||
|
|
removed_prop = await WebDAVProperty.get_or_none(id=prop.id)
|
||
|
|
assert removed_prop is None
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_propfind_not_found(test_user):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"PROPFIND",
|
||
|
|
"/webdav/nonexistentfolder/",
|
||
|
|
headers={
|
||
|
|
"Depth": "1",
|
||
|
|
**get_basic_auth_header("testuser", "testpass")
|
||
|
|
},
|
||
|
|
)
|
||
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_mkcol_nested_fail(test_user):
|
||
|
|
"""Test creating a nested directory where the parent does not exist."""
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MKCOL",
|
||
|
|
"/webdav/parent/newfolder/",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
# Expect 409 Conflict because parent collection does not exist
|
||
|
|
assert response.status_code == status.HTTP_409_CONFLICT
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_mkcol_already_exists(test_user, test_folder):
|
||
|
|
"""Test creating a directory that already exists."""
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MKCOL",
|
||
|
|
f"/webdav/{test_folder.name}/",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
# Expect 405 Method Not Allowed if collection already exists
|
||
|
|
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_delete_non_existent_file(test_user):
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.delete(
|
||
|
|
"/webdav/nonexistent.txt",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_delete_non_empty_folder(test_user, test_file):
|
||
|
|
"""A non-empty folder cannot be deleted."""
|
||
|
|
folder_to_delete = test_file.parent
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.delete(
|
||
|
|
f"/webdav/{folder_to_delete.name}/",
|
||
|
|
headers=get_basic_auth_header("testuser", "testpass"),
|
||
|
|
)
|
||
|
|
# Expect 409 Conflict as the folder is not empty
|
||
|
|
assert response.status_code == status.HTTP_409_CONFLICT
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_copy_file_overwrite_false_fail(test_user, test_file):
|
||
|
|
# Create a destination file that already exists
|
||
|
|
dest_file = await File.create(
|
||
|
|
name=f"copied_{test_file.name}",
|
||
|
|
path=f"copied_{test_file.name}",
|
||
|
|
size=1,
|
||
|
|
mime_type="text/plain",
|
||
|
|
file_hash="dummyhash2",
|
||
|
|
owner=test_user,
|
||
|
|
parent=test_file.parent,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"COPY",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{test_file.parent.name}/{dest_file.name}",
|
||
|
|
"Overwrite": "F",
|
||
|
|
**get_basic_auth_header("testuser", "testpass"),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
# 412 Precondition Failed because Overwrite is 'F' and destination exists
|
||
|
|
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
|
||
|
|
finally:
|
||
|
|
await dest_file.delete()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_webdav_move_file_overwrite_false_fail(test_user, test_file):
|
||
|
|
dest_folder = await Folder.create(name="destfolder", owner=test_user)
|
||
|
|
# Create a file with the same name at the destination
|
||
|
|
existing_dest_file = await File.create(
|
||
|
|
name=test_file.name,
|
||
|
|
path=f"{dest_folder.name}/{test_file.name}",
|
||
|
|
size=1,
|
||
|
|
mime_type="text/plain",
|
||
|
|
file_hash="dummyhash3",
|
||
|
|
owner=test_user,
|
||
|
|
parent=dest_folder,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
async with AsyncClient(
|
||
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
||
|
|
) as client:
|
||
|
|
response = await client.request(
|
||
|
|
"MOVE",
|
||
|
|
f"/webdav/{test_file.parent.name}/{test_file.name}",
|
||
|
|
headers={
|
||
|
|
"Destination": f"http://test/webdav/{dest_folder.name}/{test_file.name}",
|
||
|
|
"Overwrite": "F",
|
||
|
|
**get_basic_auth_header("testuser", "testpass"),
|
||
|
|
},
|
||
|
|
)
|
||
|
|
# 412 Precondition Failed because Overwrite is 'F' and destination exists
|
||
|
|
assert response.status_code == status.HTTP_412_PRECONDITION_FAILED
|
||
|
|
finally:
|
||
|
|
await dest_folder.delete()
|
||
|
|
await existing_dest_file.delete()
|