567 lines
19 KiB
Python
Raw Normal View History

2025-10-03 02:09:53 +02:00
"""
Comprehensive Test Suite for WebDAV Server
Tests all WebDAV methods, authentication, and edge cases
"""
import pytest
2025-10-03 02:33:26 +02:00
import pytest_asyncio
2025-10-03 02:09:53 +02:00
import asyncio
import tempfile
import shutil
from pathlib import Path
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
import base64
# Import the main application
import sys
sys.path.insert(0, '.')
from main import Database, AuthHandler, WebDAVHandler, init_app, Config
# ============================================================================
# Fixtures
# ============================================================================
2025-10-03 02:33:26 +02:00
@pytest_asyncio.fixture
2025-10-03 02:09:53 +02:00
async def temp_dir():
"""Create temporary directory for tests"""
temp_path = Path(tempfile.mkdtemp())
yield temp_path
shutil.rmtree(temp_path, ignore_errors=True)
2025-10-03 02:33:26 +02:00
@pytest_asyncio.fixture
2025-10-03 02:09:53 +02:00
async def test_db(temp_dir):
"""Create test database"""
db_path = temp_dir / 'test.db'
db = Database(str(db_path))
# Create test users
await db.create_user('testuser', 'testpass123', 'test@example.com', 'user')
await db.create_user('admin', 'adminpass123', 'admin@example.com', 'admin')
yield db
2025-10-03 02:33:26 +02:00
@pytest_asyncio.fixture
2025-10-03 02:09:53 +02:00
async def test_app(test_db, temp_dir, monkeypatch):
"""Create test application"""
# Override config for testing
monkeypatch.setattr(Config, 'DB_PATH', str(temp_dir / 'test.db'))
monkeypatch.setattr(Config, 'WEBDAV_ROOT', str(temp_dir / 'webdav'))
# Create webdav root
(temp_dir / 'webdav' / 'users' / 'testuser').mkdir(parents=True, exist_ok=True)
(temp_dir / 'webdav' / 'users' / 'admin').mkdir(parents=True, exist_ok=True)
app = await init_app()
app['db'] = test_db
yield app
2025-10-03 02:33:26 +02:00
@pytest_asyncio.fixture
2025-10-03 02:09:53 +02:00
async def client(test_app):
"""Create test client"""
server = TestServer(test_app)
client = TestClient(server)
await client.start_server()
yield client
await client.close()
@pytest.fixture
def basic_auth_header():
"""Create basic auth header"""
credentials = base64.b64encode(b'testuser:testpass123').decode('utf-8')
return {'Authorization': f'Basic {credentials}'}
# ============================================================================
# Authentication Tests
# ============================================================================
class TestAuthentication:
"""Test authentication mechanisms"""
@pytest.mark.asyncio
async def test_no_auth_returns_401(self, client):
"""Test that requests without auth return 401"""
response = await client.get('/')
assert response.status == 401
assert 'WWW-Authenticate' in response.headers
@pytest.mark.asyncio
async def test_basic_auth_success(self, client, basic_auth_header):
"""Test successful basic authentication"""
response = await client.get('/', headers=basic_auth_header)
assert response.status in [200, 207] # 207 for PROPFIND
@pytest.mark.asyncio
async def test_basic_auth_invalid_credentials(self, client):
"""Test basic auth with invalid credentials"""
credentials = base64.b64encode(b'testuser:wrongpass').decode('utf-8')
headers = {'Authorization': f'Basic {credentials}'}
response = await client.get('/', headers=headers)
assert response.status == 401
@pytest.mark.asyncio
async def test_basic_auth_malformed(self, client):
"""Test basic auth with malformed header"""
headers = {'Authorization': 'Basic invalid-base64'}
response = await client.get('/', headers=headers)
assert response.status == 401
# ============================================================================
# HTTP Methods Tests
# ============================================================================
class TestHTTPMethods:
"""Test standard HTTP methods"""
@pytest.mark.asyncio
async def test_options(self, client, basic_auth_header):
"""Test OPTIONS method"""
response = await client.options('/', headers=basic_auth_header)
assert response.status == 200
assert 'DAV' in response.headers
assert 'Allow' in response.headers
allow = response.headers['Allow']
assert 'PROPFIND' in allow
assert 'PROPPATCH' in allow
assert 'MKCOL' in allow
assert 'LOCK' in allow
assert 'UNLOCK' in allow
@pytest.mark.asyncio
async def test_get_nonexistent_file(self, client, basic_auth_header):
"""Test GET on nonexistent file"""
response = await client.get('/nonexistent.txt', headers=basic_auth_header)
assert response.status == 404
@pytest.mark.asyncio
async def test_put_and_get_file(self, client, basic_auth_header):
"""Test PUT and GET file"""
content = b'Hello, WebDAV!'
# PUT file
put_response = await client.put(
'/test.txt',
data=content,
headers=basic_auth_header
)
assert put_response.status in [201, 204]
# GET file
get_response = await client.get('/test.txt', headers=basic_auth_header)
assert get_response.status == 200
body = await get_response.read()
assert body == content
@pytest.mark.asyncio
async def test_head_file(self, client, basic_auth_header):
"""Test HEAD method"""
# Create a file first
content = b'Test content'
await client.put('/test.txt', data=content, headers=basic_auth_header)
# HEAD request
response = await client.head('/test.txt', headers=basic_auth_header)
assert response.status == 200
assert 'Content-Length' in response.headers
assert int(response.headers['Content-Length']) == len(content)
# Body should be empty
body = await response.read()
assert body == b''
@pytest.mark.asyncio
async def test_delete_file(self, client, basic_auth_header):
"""Test DELETE method"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Delete it
response = await client.delete('/test.txt', headers=basic_auth_header)
assert response.status == 204
# Verify it's gone
get_response = await client.get('/test.txt', headers=basic_auth_header)
assert get_response.status == 404
# ============================================================================
# WebDAV Methods Tests
# ============================================================================
class TestWebDAVMethods:
"""Test WebDAV-specific methods"""
@pytest.mark.asyncio
async def test_mkcol(self, client, basic_auth_header):
"""Test MKCOL (create collection)"""
response = await client.request(
'MKCOL',
'/newdir/',
headers=basic_auth_header
)
assert response.status == 201
# Verify directory was created with PROPFIND
propfind_response = await client.request(
'PROPFIND',
'/newdir/',
headers={**basic_auth_header, 'Depth': '0'}
)
assert propfind_response.status == 207
@pytest.mark.asyncio
async def test_mkcol_already_exists(self, client, basic_auth_header):
"""Test MKCOL on existing directory"""
# Create directory
await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
# Try to create again
response = await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
assert response.status == 405 # Method Not Allowed
@pytest.mark.asyncio
async def test_propfind_depth_0(self, client, basic_auth_header):
"""Test PROPFIND with depth 0"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# PROPFIND with depth 0
propfind_body = b'''<?xml version="1.0"?>
<D:propfind xmlns:D="DAV:">
<D:allprop/>
</D:propfind>'''
response = await client.request(
'PROPFIND',
'/test.txt',
data=propfind_body,
headers={**basic_auth_header, 'Depth': '0', 'Content-Type': 'application/xml'}
)
assert response.status == 207
body = await response.text()
assert 'multistatus' in body
assert 'test.txt' in body
@pytest.mark.asyncio
async def test_propfind_depth_1(self, client, basic_auth_header):
"""Test PROPFIND with depth 1"""
# Create directory with files
await client.request('MKCOL', '/testdir/', headers=basic_auth_header)
await client.put('/testdir/file1.txt', data=b'content1', headers=basic_auth_header)
await client.put('/testdir/file2.txt', data=b'content2', headers=basic_auth_header)
# PROPFIND with depth 1
response = await client.request(
'PROPFIND',
'/testdir/',
headers={**basic_auth_header, 'Depth': '1'}
)
assert response.status == 207
body = await response.text()
assert 'file1.txt' in body
assert 'file2.txt' in body
@pytest.mark.asyncio
async def test_proppatch(self, client, basic_auth_header):
"""Test PROPPATCH (set properties)"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Set custom property
proppatch_body = b'''<?xml version="1.0"?>
<D:propertyupdate xmlns:D="DAV:">
<D:set>
<D:prop>
<D:displayname>My Document</D:displayname>
</D:prop>
</D:set>
</D:propertyupdate>'''
response = await client.request(
'PROPPATCH',
'/test.txt',
data=proppatch_body,
headers={**basic_auth_header, 'Content-Type': 'application/xml'}
)
assert response.status == 207
@pytest.mark.asyncio
async def test_copy(self, client, basic_auth_header):
"""Test COPY method"""
# Create source file
await client.put('/source.txt', data=b'content', headers=basic_auth_header)
# Copy to destination
response = await client.request(
'COPY',
'/source.txt',
headers={
**basic_auth_header,
'Destination': '/dest.txt'
}
)
assert response.status in [201, 204]
# Verify both files exist
source_response = await client.get('/source.txt', headers=basic_auth_header)
assert source_response.status == 200
dest_response = await client.get('/dest.txt', headers=basic_auth_header)
assert dest_response.status == 200
# Verify content is the same
source_content = await source_response.read()
dest_content = await dest_response.read()
assert source_content == dest_content
@pytest.mark.asyncio
async def test_move(self, client, basic_auth_header):
"""Test MOVE method"""
# Create source file
await client.put('/source.txt', data=b'content', headers=basic_auth_header)
# Move to destination
response = await client.request(
'MOVE',
'/source.txt',
headers={
**basic_auth_header,
'Destination': '/dest.txt'
}
)
assert response.status in [201, 204]
# Verify source is gone
source_response = await client.get('/source.txt', headers=basic_auth_header)
assert source_response.status == 404
# Verify destination exists
dest_response = await client.get('/dest.txt', headers=basic_auth_header)
assert dest_response.status == 200
@pytest.mark.asyncio
async def test_lock_and_unlock(self, client, basic_auth_header):
"""Test LOCK and UNLOCK methods"""
# Create a file
await client.put('/test.txt', data=b'content', headers=basic_auth_header)
# Lock the file
lock_body = b'''<?xml version="1.0"?>
<D:lockinfo xmlns:D="DAV:">
<D:lockscope><D:exclusive/></D:lockscope>
<D:locktype><D:write/></D:locktype>
<D:owner>
<D:href>mailto:test@example.com</D:href>
</D:owner>
</D:lockinfo>'''
lock_response = await client.request(
'LOCK',
'/test.txt',
data=lock_body,
headers={**basic_auth_header, 'Content-Type': 'application/xml', 'Timeout': 'Second-3600'}
)
assert lock_response.status == 200
assert 'Lock-Token' in lock_response.headers
lock_token = lock_response.headers['Lock-Token'].strip('<>')
# Unlock the file
unlock_response = await client.request(
'UNLOCK',
'/test.txt',
headers={**basic_auth_header, 'Lock-Token': f'<{lock_token}>'}
)
assert unlock_response.status == 204
# ============================================================================
# Security Tests
# ============================================================================
class TestSecurity:
"""Test security features"""
@pytest.mark.asyncio
async def test_path_traversal_prevention(self, client, basic_auth_header):
"""Test that path traversal is prevented"""
# Try to access parent directory
response = await client.get('/../../../etc/passwd', headers=basic_auth_header)
assert response.status in [403, 404]
@pytest.mark.asyncio
async def test_user_isolation(self, client):
"""Test that users can't access other users' files"""
# Create file as testuser
testuser_auth = base64.b64encode(b'testuser:testpass123').decode('utf-8')
testuser_headers = {'Authorization': f'Basic {testuser_auth}'}
await client.put('/myfile.txt', data=b'secret', headers=testuser_headers)
# Try to access as admin
admin_auth = base64.b64encode(b'admin:adminpass123').decode('utf-8')
admin_headers = {'Authorization': f'Basic {admin_auth}'}
# This should fail because each user has their own isolated directory
# The path /myfile.txt for admin is different from /myfile.txt for testuser
response = await client.get('/myfile.txt', headers=admin_headers)
assert response.status == 404
# ============================================================================
# Database Tests
# ============================================================================
class TestDatabase:
"""Test database operations"""
@pytest.mark.asyncio
async def test_create_user(self, test_db):
"""Test user creation"""
user_id = await test_db.create_user('newuser', 'password123', 'new@example.com')
assert user_id > 0
# Verify user can be retrieved
user = await test_db.verify_user('newuser', 'password123')
assert user is not None
assert user['username'] == 'newuser'
@pytest.mark.asyncio
async def test_verify_user_wrong_password(self, test_db):
"""Test user verification with wrong password"""
user = await test_db.verify_user('testuser', 'wrongpassword')
assert user is None
@pytest.mark.asyncio
async def test_lock_creation(self, test_db):
"""Test lock creation"""
lock_token = await test_db.create_lock(
'/test.txt',
1,
'write',
'exclusive',
0,
3600,
'test@example.com'
)
assert lock_token.startswith('opaquelocktoken:')
# Retrieve lock
lock = await test_db.get_lock('/test.txt')
assert lock is not None
assert lock['lock_token'] == lock_token
@pytest.mark.asyncio
async def test_lock_removal(self, test_db):
"""Test lock removal"""
lock_token = await test_db.create_lock('/test.txt', 1)
removed = await test_db.remove_lock(lock_token, 1)
assert removed is True
# Verify lock is gone
lock = await test_db.get_lock('/test.txt')
assert lock is None
@pytest.mark.asyncio
async def test_property_management(self, test_db):
"""Test property set and get"""
await test_db.set_property('/test.txt', 'DAV:', 'displayname', 'My File')
props = await test_db.get_properties('/test.txt')
assert len(props) > 0
assert any(p['property_name'] == 'displayname' for p in props)
# ============================================================================
# Integration Tests
# ============================================================================
class TestIntegration:
"""Integration tests for complex workflows"""
@pytest.mark.asyncio
async def test_complete_workflow(self, client, basic_auth_header):
"""Test complete file management workflow"""
# 1. Create directory
await client.request('MKCOL', '/docs/', headers=basic_auth_header)
# 2. Upload files
await client.put('/docs/file1.txt', data=b'content1', headers=basic_auth_header)
await client.put('/docs/file2.txt', data=b'content2', headers=basic_auth_header)
# 3. List directory
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '1'}
)
assert response.status == 207
body = await response.text()
assert 'file1.txt' in body
assert 'file2.txt' in body
# 4. Copy file
await client.request(
'COPY',
'/docs/file1.txt',
headers={**basic_auth_header, 'Destination': '/docs/file1_copy.txt'}
)
# 5. Move file
await client.request(
'MOVE',
'/docs/file2.txt',
headers={**basic_auth_header, 'Destination': '/docs/renamed.txt'}
)
# 6. Verify final state
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '1'}
)
body = await response.text()
assert 'file1.txt' in body
assert 'file1_copy.txt' in body
assert 'renamed.txt' in body
assert 'file2.txt' not in body
# 7. Delete directory
await client.delete('/docs/', headers=basic_auth_header)
# 8. Verify deletion
response = await client.request(
'PROPFIND',
'/docs/',
headers={**basic_auth_header, 'Depth': '0'}
)
assert response.status == 404
# ============================================================================
# Run Tests
# ============================================================================
if __name__ == '__main__':
pytest.main([__file__, '-v', '--tb=short'])