"""
Comprehensive Test Suite for WebDAV Server
Tests all WebDAV methods, authentication, and edge cases
"""
import pytest
import asyncio
import tempfile
import shutil
from pathlib import Path
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
import base64
# Import the main application
import sys
sys.path.insert(0, '.')
from main import Database, AuthHandler, WebDAVHandler, init_app, Config
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
async def temp_dir():
"""Create temporary directory for tests"""
temp_path = Path(tempfile.mkdtemp())
yield temp_path
shutil.rmtree(temp_path, ignore_errors=True)
@pytest.fixture
async def test_db(temp_dir):
"""Create test database"""
db_path = temp_dir / 'test.db'
db = Database(str(db_path))
# Create test users
await db.create_user('testuser', 'testpass123', 'test@example.com', 'user')
await db.create_user('admin', 'adminpass123', 'admin@example.com', 'admin')
yield db
@pytest.fixture
async def test_app(test_db, temp_dir, monkeypatch):
"""Create test application"""
# Override config for testing
monkeypatch.setattr(Config, 'DB_PATH', str(temp_dir / 'test.db'))
monkeypatch.setattr(Config, 'WEBDAV_ROOT', str(temp_dir / 'webdav'))
# Create webdav root
(temp_dir / 'webdav' / 'users' / 'testuser').mkdir(parents=True, exist_ok=True)
(temp_dir / 'webdav' / 'users' / 'admin').mkdir(parents=True, exist_ok=True)
app = await init_app()
app['db'] = test_db
yield app
@pytest.fixture
async def client(test_app):
"""Create test client"""
server = TestServer(test_app)
client = TestClient(server)
await client.start_server()
yield client
await client.close()
@pytest.fixture
def basic_auth_header():
"""Create basic auth header"""
credentials = base64.b64encode(b'testuser:testpass123').decode('utf-8')
return {'Authorization': f'Basic {credentials}'}
# ============================================================================
# Authentication Tests
# ============================================================================
class TestAuthentication:
"""Test authentication mechanisms"""
@pytest.mark.asyncio
async def test_no_auth_returns_401(self, client):
"""Test that requests without auth return 401"""
response = await client.get('/')
assert response.status == 401
assert 'WWW-Authenticate' in response.headers
@pytest.mark.asyncio
async def test_basic_auth_success(self, client, basic_auth_header):
"""Test successful basic authentication"""
response = await client.get('/', headers=basic_auth_header)
assert response.status in [200, 207] # 207 for PROPFIND
@pytest.mark.asyncio
async def test_basic_auth_invalid_credentials(self, client):
"""Test basic auth with invalid credentials"""
credentials = base64.b64encode(b'testuser:wrongpass').decode('utf-8')
headers = {'Authorization': f'Basic {credentials}'}
response = await client.get('/', headers=headers)
assert response.status == 401
@pytest.mark.asyncio
async def test_basic_auth_malformed(self, client):
"""Test basic auth with malformed header"""
headers = {'Authorization': 'Basic invalid-base64'}
response = await client.get('/', headers=headers)
assert response.status == 401
# ============================================================================
# HTTP Methods Tests
# ============================================================================
class TestHTTPMethods:
"""Test standard HTTP methods"""
@pytest.mark.asyncio
async def test_options(self, client, basic_auth_header):
"""Test OPTIONS method"""
response = await client.options('/', headers=basic_auth_header)
assert response.status == 200
assert 'DAV' in response.headers
assert 'Allow' in response.headers
allow = response.headers['Allow']
assert 'PROPFIND' in allow
assert 'PROPPATCH' in allow
assert 'MKCOL' in allow
assert 'LOCK' in allow
assert 'UNLOCK' in allow
@pytest.mark.asyncio
async def test_get_nonexistent_file(self, client, basic_auth_header):
"""Test GET on nonexistent file"""
response = await client.get('/nonexistent.txt', headers=basic_auth_header)
assert response.status == 404
@pytest.mark.asyncio
async def test_put_and_get_file(self, client, basic_auth_header):
"""Test PUT and GET file"""
content = b'Hello, WebDAV!'
# PUT file
put_response = await client.put(
'/test.txt',
data=content,
headers=basic_auth_header
)
assert put_response.status in [201, 204]
# GET file
get_response = await client.get('/test.txt', headers=basic_auth_header)
assert get_response.status == 200
body = await get_response.read()
assert body == content
@pytest.mark.asyncio
async def test_head_file(self, client, basic_auth_header):
"""Test HEAD method"""
# Create a file first
content = b'Test content'
await client.put('/test.txt', data=content, headers=basic_auth_header)
# HEAD request
response = await client.head('/test.txt', headers=basic_auth_header)
assert response.status == 200
assert 'Content-Length' in response.headers
assert int(response.headers['Content-Length']) == len(content)
# Body should be empty
body = await response.read()
assert body == b''
@pytest.mark.asyncio
async def test_delete_file(self, client, basic_auth_header):
"""Test DELETE method"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Delete it
response = await client.delete('/test.txt', headers=basic_auth_header)
assert response.status == 204
# Verify it's gone
get_response = await client.get('/test.txt', headers=basic_auth_header)
assert get_response.status == 404
# ============================================================================
# WebDAV Methods Tests
# ============================================================================
class TestWebDAVMethods:
"""Test WebDAV-specific methods"""
@pytest.mark.asyncio
async def test_mkcol(self, client, basic_auth_header):
"""Test MKCOL (create collection)"""
response = await client.request(
'MKCOL',
'/newdir/',
headers=basic_auth_header
)
assert response.status == 201
# Verify directory was created with PROPFIND
propfind_response = await client.request(
'PROPFIND',
'/newdir/',
headers={**basic_auth_header, 'Depth': '0'}
)
assert propfind_response.status == 207
@pytest.mark.asyncio
async def test_mkcol_already_exists(self, client, basic_auth_header):
"""Test MKCOL on existing directory"""
# Create directory
await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
# Try to create again
response = await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
assert response.status == 405 # Method Not Allowed
@pytest.mark.asyncio
async def test_propfind_depth_0(self, client, basic_auth_header):
"""Test PROPFIND with depth 0"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# PROPFIND with depth 0
propfind_body = b'''<?xml version="1.0"?>
<D:propfind xmlns:D="DAV:">
<D:allprop/>
</D:propfind>'''
response = await client.request(
'PROPFIND',
'/test.txt',
data=propfind_body,
headers={**basic_auth_header, 'Depth': '0', 'Content-Type': 'application/xml'}
)
assert response.status == 207
body = await response.text()
assert 'multistatus' in body
assert 'test.txt' in body
@pytest.mark.asyncio
async def test_propfind_depth_1(self, client, basic_auth_header):
"""Test PROPFIND with depth 1"""
# Create directory with files
await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
await client.put('/testdir/file1.txt', data=b'content1', headers=basic_auth_header)
await client.put('/testdir/file2.txt', data=b'content2', headers=basic_auth_header)
# PROPFIND with depth 1
response = await client.request(
'PROPFIND',
'/testdir/',
headers={**basic_auth_header, 'Depth': '1'}
)
assert response.status == 207
body = await response.text()
assert 'file1.txt' in body
assert 'file2.txt' in body
@pytest.mark.asyncio
async def test_proppatch(self, client, basic_auth_header):
"""Test PROPPATCH (set properties)"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Set custom property
proppatch_body = b'''<?xml version="1.0"?>
<D:propertyupdate xmlns:D="DAV:">
<D:set>
<D:prop>
<D:displayname>My Document</D:displayname>
</D:prop>
</D:set>
</D:propertyupdate>'''
response = await client.request(
'PROPPATCH',
'/test.txt',
data=proppatch_body,
headers={**basic_auth_header, 'Content-Type': 'application/xml'}
)
assert response.status == 207
@pytest.mark.asyncio
async def test_copy(self, client, basic_auth_header):
"""Test COPY method"""
# Create source file
await client.put('/source.txt', data=b'content', headers=basic_auth_header)
# Copy to destination
response = await client.request(
'COPY',
'/source.txt',
headers={
**basic_auth_header,
'Destination': '/dest.txt'
}
)
assert response.status in [201, 204]
# Verify both files exist
source_response = await client.get('/source.txt', headers=basic_auth_header)
assert source_response.status == 200
dest_response = await client.get('/dest.txt', headers=basic_auth_header)
assert dest_response.status == 200
# Verify content is the same
source_content = await source_response.read()
dest_content = await dest_response.read()
assert source_content == dest_content
@pytest.mark.asyncio
async def test_move(self, client, basic_auth_header):
"""Test MOVE method"""
# Create source file
await client.put('/source.txt', data=b'content', headers=basic_auth_header)
# Move to destination
response = await client.request(
'MOVE',
'/source.txt',
headers={
**basic_auth_header,
'Destination': '/dest.txt'
}
)
assert response.status in [201, 204]
# Verify source is gone
source_response = await client.get('/source.txt', headers=basic_auth_header)
assert source_response.status == 404
# Verify destination exists
dest_response = await client.get('/dest.txt', headers=basic_auth_header)
assert dest_response.status == 200
@pytest.mark.asyncio
async def test_lock_and_unlock(self, client, basic_auth_header):
"""Test LOCK and UNLOCK methods"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Lock the file
lock_body = b'''<?xml version="1.0"?>
<D:lockinfo xmlns:D="DAV:">
<D:lockscope><D:exclusive/></D:lockscope>
<D:locktype><D:write/></D:locktype>
<D:owner>
<D:href>mailto:test@example.com</D:href>
</D:owner>
</D:lockinfo>'''
lock_response = await client.request(
'LOCK',
'/test.txt',
data=lock_body,
headers={**basic_auth_header, 'Content-Type': 'application/xml', 'Timeout': 'Second-3600'}
)
assert lock_response.status == 200
assert 'Lock-Token' in lock_response.headers
lock_token = lock_response.headers['Lock-Token'].strip('<>')
# Unlock the file
unlock_response = await client.request(
'UNLOCK',
'/test.txt',
headers={**basic_auth_header, 'Lock-Token': f'<{lock_token}>'}
)
assert unlock_response.status == 204
# ============================================================================
# Security Tests
# ============================================================================
class TestSecurity:
"""Test security features"""
@pytest.mark.asyncio
async def test_path_traversal_prevention(self, client, basic_auth_header):
"""Test that path traversal is prevented"""
# Try to access parent directory
response = await client.get('/../../../etc/passwd', headers=basic_auth_header)
assert response.status in [403, 404]
@pytest.mark.asyncio
async def test_user_isolation(self, client):
"""Test that users can't access other users' files"""
# Create file as testuser
testuser_auth = base64.b64encode(b'testuser:testpass123').decode('utf-8')
testuser_headers = {'Authorization': f'Basic {testuser_auth}'}
await client.put('/myfile.txt', data=b'secret', headers=testuser_headers)
# Try to access as admin
admin_auth = base64.b64encode(b'admin:adminpass123').decode('utf-8')
admin_headers = {'Authorization': f'Basic {admin_auth}'}
# This should fail because each user has their own isolated directory
# The path /myfile.txt for admin is different from /myfile.txt for testuser
response = await client.get('/myfile.txt', headers=admin_headers)
assert response.status == 404
# ============================================================================
# Database Tests
# ============================================================================
class TestDatabase:
"""Test database operations"""
@pytest.mark.asyncio
async def test_create_user(self, test_db):
"""Test user creation"""
user_id = await test_db.create_user('newuser', 'password123', 'new@example.com')
assert user_id > 0
# Verify user can be retrieved
user = await test_db.verify_user('newuser', 'password123')
assert user is not None
assert user['username'] == 'newuser'
@pytest.mark.asyncio
async def test_verify_user_wrong_password(self, test_db):
"""Test user verification with wrong password"""
user = await test_db.verify_user('testuser', 'wrongpassword')
assert user is None
@pytest.mark.asyncio
async def test_lock_creation(self, test_db):
"""Test lock creation"""
lock_token = await test_db.create_lock(
'/test.txt',
1,
'write',
'exclusive',
0,
3600,
'test@example.com'
)
assert lock_token.startswith('opaquelocktoken:')
# Retrieve lock
lock = await test_db.get_lock('/test.txt')
assert lock is not None
assert lock['lock_token'] == lock_token
@pytest.mark.asyncio
async def test_lock_removal(self, test_db):
"""Test lock removal"""
lock_token = await test_db.create_lock('/test.txt', 1)
removed = await test_db.remove_lock(lock_token, 1)
assert removed is True
# Verify lock is gone
lock = await test_db.get_lock('/test.txt')
assert lock is None
@pytest.mark.asyncio
async def test_property_management(self, test_db):
"""Test property set and get"""
await test_db.set_property('/test.txt', 'DAV:', 'displayname', 'My File')
props = await test_db.get_properties('/test.txt')
assert len(props) > 0
assert any(p['property_name'] == 'displayname' for p in props)
# ============================================================================
# Integration Tests
# ============================================================================
class TestIntegration:
"""Integration tests for complex workflows"""
@pytest.mark.asyncio
async def test_complete_workflow(self, client, basic_auth_header):
"""Test complete file management workflow"""
# 1. Create directory
await client.request('MKCOL', '/docs/', headers=basic_auth_header)
# 2. Upload files
await client.put('/docs/file1.txt', data=b'content1', headers=basic_auth_header)
await client.put('/docs/file2.txt', data=b'content2', headers=basic_auth_header)
# 3. List directory
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '1'}
)
assert response.status == 207
body = await response.text()
assert 'file1.txt' in body
assert 'file2.txt' in body
# 4. Copy file
await client.request(
'COPY',
'/docs/file1.txt',
headers={**basic_auth_header, 'Destination': '/docs/file1_copy.txt'}
)
# 5. Move file
await client.request(
'MOVE',
'/docs/file2.txt',
headers={**basic_auth_header, 'Destination': '/docs/renamed.txt'}
)
# 6. Verify final state
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '1'}
)
body = await response.text()
assert 'file1.txt' in body
assert 'file1_copy.txt' in body
assert 'renamed.txt' in body
assert 'file2.txt' not in body
# 7. Delete directory
await client.delete('/docs/', headers=basic_auth_header)
# 8. Verify deletion
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '0'}
)
assert response.status == 404
# ============================================================================
# Run Tests
# ============================================================================
if __name__ == '__main__':
pytest.main([__file__, '-v', '--tb=short'])