"""
Complete WebDAV Server Implementation with aiohttp
Production-ready WebDAV server with full RFC 4918 compliance,
Windows Explorer compatibility, and comprehensive user management.
"""
import os
import asyncio
import aiofiles
import sqlite3
import hashlib
import hmac
import secrets
import mimetypes
import base64
import functools
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, List, Tuple
from xml.etree import ElementTree as ET
from urllib.parse import unquote, quote
from aiohttp import web, BasicAuth
from aiohttp_session import setup as setup_session, get_session
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from cryptography import fernet
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# ============================================================================
# Configuration Management
# ============================================================================
class Config:
"""Centralized configuration management from environment variables"""
# Server Configuration
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', '8080'))
SSL_ENABLED = os.getenv('SSL_ENABLED', 'false').lower() == 'true'
SSL_CERT_PATH = os.getenv('SSL_CERT_PATH', '')
SSL_KEY_PATH = os.getenv('SSL_KEY_PATH', '')
# Database Configuration
DB_PATH = os.getenv('DB_PATH', './webdav.db')
# Authentication Configuration
AUTH_METHODS = os.getenv('AUTH_METHODS', 'basic,digest').split(',')
JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', secrets.token_hex(32))
SESSION_TIMEOUT = int(os.getenv('SESSION_TIMEOUT', '3600'))
PASSWORD_MIN_LENGTH = int(os.getenv('PASSWORD_MIN_LENGTH', '8'))
# WebDAV Configuration
MAX_FILE_SIZE = int(os.getenv('MAX_FILE_SIZE', '104857600')) # 100MB
MAX_PROPFIND_DEPTH = int(os.getenv('MAX_PROPFIND_DEPTH', '3'))
LOCK_TIMEOUT_DEFAULT = int(os.getenv('LOCK_TIMEOUT_DEFAULT', '3600'))
ENABLE_WINDOWS_COMPATIBILITY = os.getenv('ENABLE_WINDOWS_COMPATIBILITY', 'true').lower() == 'true'
# WebDAV Root Directory
WEBDAV_ROOT = os.getenv('WEBDAV_ROOT', './webdav')
# Security Configuration
RATE_LIMIT_ENABLED = os.getenv('RATE_LIMIT_ENABLED', 'true').lower() == 'true'
RATE_LIMIT_REQUESTS = int(os.getenv('RATE_LIMIT_REQUESTS', '100'))
RATE_LIMIT_WINDOW = int(os.getenv('RATE_LIMIT_WINDOW', '60'))
# ============================================================================
# Database Layer
# ============================================================================
class Database:
"""SQLite database management with async wrapper"""
def __init__(self, db_path: str):
self.db_path = db_path
self._connection_lock = asyncio.Lock()
self.init_database()
def get_connection(self) -> sqlite3.Connection:
"""Get database connection with row factory"""
conn = sqlite3.connect(self.db_path, timeout=30.0, check_same_thread=False)
conn.row_factory = sqlite3.Row
# Enable WAL mode for better concurrent access
conn.execute('PRAGMA journal_mode=WAL')
conn.execute('PRAGMA busy_timeout=30000')
conn.execute('PRAGMA synchronous=NORMAL')
return conn
def init_database(self):
"""Initialize database schema"""
conn = self.get_connection()
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
role TEXT DEFAULT 'user'
)
''')
# Sessions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
user_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
ip_address TEXT,
user_agent TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Locks table
cursor.execute('''
CREATE TABLE IF NOT EXISTS locks (
lock_token TEXT PRIMARY KEY,
resource_path TEXT NOT NULL,
user_id INTEGER,
lock_type TEXT DEFAULT 'write',
lock_scope TEXT DEFAULT 'exclusive',
depth INTEGER DEFAULT 0,
timeout_seconds INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
owner TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Properties table
cursor.execute('''
CREATE TABLE IF NOT EXISTS properties (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resource_path TEXT NOT NULL,
namespace TEXT,
property_name TEXT NOT NULL,
property_value TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Permissions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS permissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
resource_path TEXT NOT NULL,
permission_type TEXT NOT NULL,
granted_by INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (granted_by) REFERENCES users (id)
)
''')
# Create indices
cursor.execute('CREATE INDEX IF NOT EXISTS idx_locks_resource ON locks(resource_path)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_properties_resource ON properties(resource_path)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_permissions_user ON permissions(user_id)')
conn.commit()
conn.close()
async def create_user(self, username: str, password: str, email: str = None, role: str = 'user') -> int:
"""Create a new user"""
salt = secrets.token_hex(16)
password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000).hex()
def _create():
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt, role)
VALUES (?, ?, ?, ?, ?)
''', (username, email, password_hash, salt, role))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return user_id
# Run in thread pool to prevent blocking
user_id = await asyncio.get_event_loop().run_in_executor(None, _create)
# Create user directory
user_dir = Path(Config.WEBDAV_ROOT) / 'users' / username
user_dir.mkdir(parents=True, exist_ok=True)
return user_id
def _verify_user(self, username: str, password: str) -> Optional[Dict]:
"""Verify user credentials"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ? AND is_active = 1', (username,))
user = cursor.fetchone()
conn.close()
return dict(user) if user else None
@functools.lru_cache()
def _verify_user(self, username: str, password: str) -> Optional[Dict]:
"""Verify user credentials"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ? AND is_active = 1', (username,))
user = cursor.fetchone()
conn.close()
if not user:
return None
password_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), user['salt'].encode(), 100000).hex()
if password_hash == user['password_hash']:
return dict(user)
return None
async def verify_user(self, username: str, password: str) -> Optional[Dict]:
"""Verify user credentials"""
return self._verify_user(username, password)
async def get_user_by_id(self, user_id: int) -> Optional[Dict]:
"""Get user by ID"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = cursor.fetchone()
conn.close()
return dict(user) if user else None
async def create_lock(self, resource_path: str, user_id: int, lock_type: str = 'write',
lock_scope: str = 'exclusive', depth: int = 0,
timeout: int = None, owner: str = None) -> str:
"""Create a resource lock"""
lock_token = f"opaquelocktoken:{secrets.token_urlsafe(32)}"
timeout = timeout or Config.LOCK_TIMEOUT_DEFAULT
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO locks (lock_token, resource_path, user_id, lock_type, lock_scope,
depth, timeout_seconds, owner)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (lock_token, resource_path, user_id, lock_type, lock_scope, depth, timeout, owner))
conn.commit()
conn.close()
return lock_token
async def get_lock(self, resource_path: str) -> Optional[Dict]:
"""Get lock for a resource"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM locks WHERE resource_path = ?
AND datetime(created_at, '+' || timeout_seconds || ' seconds') > datetime('now')
''', (resource_path,))
lock = cursor.fetchone()
conn.close()
return dict(lock) if lock else None
async def remove_lock(self, lock_token: str, user_id: int) -> bool:
"""Remove a lock"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('DELETE FROM locks WHERE lock_token = ? AND user_id = ?', (lock_token, user_id))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
async def set_property(self, resource_path: str, namespace: str,
property_name: str, property_value: str):
"""Set a custom property on a resource"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO properties (resource_path, namespace, property_name, property_value)
VALUES (?, ?, ?, ?)
''', (resource_path, namespace, property_name, property_value))
conn.commit()
conn.close()
async def remove_property(self, resource_path: str, namespace: str, property_name: str):
"""Remove a custom property from a resource"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
DELETE FROM properties
WHERE resource_path = ? AND property_name = ?
AND (namespace = ? OR (namespace IS NULL AND ? = ''))
''', (resource_path, property_name, namespace, namespace))
conn.commit()
conn.close()
async def get_properties(self, resource_path: str) -> List[Dict]:
"""Get all properties for a resource"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM properties WHERE resource_path = ?
''', (resource_path,))
properties = cursor.fetchall()
conn.close()
return [dict(prop) for prop in properties]
# ============================================================================
# XML Utilities for WebDAV
# ============================================================================
class WebDAVXML:
"""XML processing utilities for WebDAV protocol"""
# WebDAV namespaces
NS = {
'D': 'DAV:',
'MS': 'urn:schemas-microsoft-com:',
'C': 'http://webdav.org/custom/' # Custom properties namespace
}
@staticmethod
def register_namespaces():
"""Register XML namespaces"""
for prefix, uri in WebDAVXML.NS.items():
ET.register_namespace(prefix, uri)
@staticmethod
def create_multistatus() -> ET.Element:
"""Create multistatus response root"""
WebDAVXML.register_namespaces()
return ET.Element('{DAV:}multistatus')
@staticmethod
def create_response(href: str) -> ET.Element:
"""Create response element"""
response = ET.Element('{DAV:}response')
href_elem = ET.SubElement(response, '{DAV:}href')
href_elem.text = href
return response
@staticmethod
def add_propstat(response: ET.Element, props: Dict[str, str], status: str = '200 OK'):
"""Add propstat element to response"""
propstat = ET.SubElement(response, '{DAV:}propstat')
prop = ET.SubElement(propstat, '{DAV:}prop')
is_collection = props.pop('_is_collection', False)
# Standard DAV properties - these should be in DAV: namespace
standard_props = {
'resourcetype', 'creationdate', 'getlastmodified',
'getcontentlength', 'getcontenttype', 'displayname',
'getetag', 'lockdiscovery', 'supportedlock'
}
for prop_name, prop_value in props.items():
# Check if property is already in {namespace}name format
if prop_name.startswith('{'):
# Already has namespace in Clark notation
prop_elem = ET.SubElement(prop, prop_name)
# Handle colon-separated namespace prefix (e.g., "D:displayname")
elif ':' in prop_name and not prop_name.startswith('http'):
ns_prefix, name = prop_name.split(':', 1)
ns_uri = WebDAVXML.NS.get(ns_prefix, ns_prefix)
prop_elem = ET.SubElement(prop, f'{{{ns_uri}}}{name}')
# Standard DAV properties
elif prop_name.lower() in standard_props:
prop_elem = ET.SubElement(prop, f'{{DAV:}}{prop_name}')
# Custom properties without explicit namespace - use custom namespace
else:
prop_elem = ET.SubElement(prop, f'{{http://webdav.org/custom/}}{prop_name}')
# Special handling for resourcetype
if prop_name == 'resourcetype' or prop_name == '{DAV:}resourcetype':
if is_collection:
ET.SubElement(prop_elem, '{DAV:}collection')
# Empty for non-collections
elif prop_value is not None:
prop_elem.text = str(prop_value)
status_elem = ET.SubElement(propstat, '{DAV:}status')
status_elem.text = f'HTTP/1.1 {status}'
@staticmethod
def serialize(element: ET.Element) -> str:
"""Serialize XML element to string"""
# Register all namespaces before serializing
WebDAVXML.register_namespaces()
return ET.tostring(element, encoding='unicode', method='xml')
@staticmethod
def parse_propfind(body: bytes) -> Tuple[str, List[str]]:
"""Parse PROPFIND request body"""
if not body:
return 'allprop', []
try:
root = ET.fromstring(body)
# Check for allprop
if root.find('.//{DAV:}allprop') is not None:
return 'allprop', []
# Check for propname
if root.find('.//{DAV:}propname') is not None:
return 'propname', []
# Get specific properties
prop_elem = root.find('.//{DAV:}prop')
if prop_elem is not None:
props = []
for child in prop_elem:
props.append(child.tag)
return 'prop', props
return 'allprop', []
except Exception:
return 'allprop', []
# ============================================================================
# Authentication and Authorization
# ============================================================================
class AuthHandler:
"""Handle multiple authentication methods"""
def __init__(self, db: Database):
self.db = db
self.nonces = {} # Store nonces for digest auth
async def authenticate_basic(self, request: web.Request) -> Optional[Dict]:
"""Basic authentication"""
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Basic '):
return None
try:
credentials = base64.b64decode(auth_header[6:]).decode('utf-8')
username, password = credentials.split(':', 1)
return await self.db.verify_user(username, password)
except Exception:
return None
def generate_digest_challenge(self, realm: str = 'WebDAV Server') -> str:
"""Generate digest authentication challenge"""
nonce = secrets.token_hex(16)
opaque = secrets.token_hex(16)
self.nonces[nonce] = datetime.now()
return f'Digest realm="{realm}", qop="auth", nonce="{nonce}", opaque="{opaque}"'
async def authenticate_digest(self, request: web.Request) -> Optional[Dict]:
"""Digest authentication"""
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Digest '):
return None
# Parse digest parameters
params = {}
for item in auth_header[7:].split(','):
key, value = item.strip().split('=', 1)
params[key] = value.strip('"')
username = params.get('username')
nonce = params.get('nonce')
uri = params.get('uri')
response = params.get('response')
if not all([username, nonce, uri, response]):
return None
# Verify nonce
if nonce not in self.nonces:
return None
# Get user from database
conn = self.db.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ? AND is_active = 1', (username,))
user = cursor.fetchone()
conn.close()
if not user:
return None
# Calculate expected response
ha1 = hashlib.md5(f"{username}:WebDAV Server:{user['password_hash']}".encode()).hexdigest()
ha2 = hashlib.md5(f"{request.method}:{uri}".encode()).hexdigest()
expected_response = hashlib.md5(f"{ha1}:{nonce}:{ha2}".encode()).hexdigest()
if response == expected_response:
return dict(user)
return None
async def authenticate(self, request: web.Request) -> Optional[Dict]:
"""Authenticate request using configured methods"""
if 'basic' in Config.AUTH_METHODS:
user = await self.authenticate_basic(request)
if user:
return user
if 'digest' in Config.AUTH_METHODS:
user = await self.authenticate_digest(request)
if user:
return user
return None
def require_auth_response(self) -> web.Response:
"""Return 401 response with authentication challenges"""
challenges = []
if 'basic' in Config.AUTH_METHODS:
challenges.append('Basic realm="WebDAV Server"')
if 'digest' in Config.AUTH_METHODS:
challenges.append(self.generate_digest_challenge())
return web.Response(
status=401,
headers={
'WWW-Authenticate': ', '.join(challenges),
'DAV': '1, 2, 3',
},
text='Unauthorized'
)
# ============================================================================
# WebDAV Handler
# ============================================================================
class WebDAVHandler:
"""Main WebDAV protocol handler"""
def __init__(self, db: Database, auth: AuthHandler):
self.db = db
self.auth = auth
WebDAVXML.register_namespaces()
def get_user_root(self, username: str) -> Path:
"""Get user's root directory"""
return Path(Config.WEBDAV_ROOT) / 'users' / username
def get_physical_path(self, username: str, webdav_path: str) -> Path:
"""Convert WebDAV path to physical file system path"""
# Remove DavWWWRoot if present (Windows compatibility)
webdav_path = webdav_path.replace('/DavWWWRoot/', '/')
webdav_path = webdav_path.replace('\\DavWWWRoot\\', '/')
# Normalize path
webdav_path = unquote(webdav_path)
webdav_path = webdav_path.lstrip('/')
# Build physical path
user_root = self.get_user_root(username)
physical_path = user_root / webdav_path
# Ensure path is within user directory (security)
try:
physical_path.resolve().relative_to(user_root.resolve())
except ValueError:
raise web.HTTPForbidden(text="Access denied")
return physical_path
async def handle_options(self, request: web.Request, user: Dict) -> web.Response:
"""Handle OPTIONS method"""
return web.Response(
status=200,
headers={
'DAV': '1, 2',
'MS-Author-Via': 'DAV',
'Allow': 'OPTIONS, GET, HEAD, POST, PUT, DELETE, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE, LOCK, UNLOCK',
'Access-Control-Allow-Methods': 'OPTIONS, GET, HEAD, POST, PUT, DELETE, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE, LOCK, UNLOCK',
}
)
async def handle_get(self, request: web.Request, user: Dict) -> web.Response:
"""Handle GET method"""
path = self.get_physical_path(user['username'], request.path)
if not path.exists():
raise web.HTTPNotFound()
if path.is_dir():
# Return directory listing for browsers
return await self.generate_directory_listing(path, request.path)
# Return file content
content_type, _ = mimetypes.guess_type(str(path))
async with aiofiles.open(path, 'rb') as f:
content = await f.read()
return web.Response(
body=content,
content_type=content_type or 'application/octet-stream',
headers={
'Content-Length': str(len(content)),
'Accept-Ranges': 'bytes',
}
)
async def handle_head(self, request: web.Request, user: Dict) -> web.Response:
"""Handle HEAD method"""
path = self.get_physical_path(user['username'], request.path)
if not path.exists():
raise web.HTTPNotFound()
content_type, _ = mimetypes.guess_type(str(path))
if path.is_file():
size = path.stat().st_size
return web.Response(
headers={
'Content-Type': content_type or 'application/octet-stream',
'Content-Length': str(size),
}
)
return web.Response(
headers={
'Content-Type': 'httpd/unix-directory',
}
)
async def handle_put(self, request: web.Request, user: Dict) -> web.Response:
"""Handle PUT method"""
path = self.get_physical_path(user['username'], request.path)
# Check if locked
lock = await self.db.get_lock(request.path)
if lock and lock['user_id'] != user['id']:
raise web.HTTPLocked()
# Create parent directories
path.parent.mkdir(parents=True, exist_ok=True)
# Write file atomically using temporary file
temp_path = path.with_suffix(path.suffix + '.tmp')
try:
async with aiofiles.open(temp_path, 'wb') as f:
async for chunk in request.content.iter_chunked(8192):
await f.write(chunk)
# Move temporary file to final location
temp_path.replace(path)
status = 201 if not path.exists() else 204
return web.Response(status=status)
except Exception as e:
if temp_path.exists():
temp_path.unlink()
raise web.HTTPInternalServerError(text=str(e))
async def handle_delete(self, request: web.Request, user: Dict) -> web.Response:
"""Handle DELETE method"""
path = self.get_physical_path(user['username'], request.path)
if not path.exists():
raise web.HTTPNotFound()
# Check if locked
lock = await self.db.get_lock(request.path)
if lock and lock['user_id'] != user['id']:
raise web.HTTPLocked()
try:
if path.is_dir():
import shutil
shutil.rmtree(path)
else:
path.unlink()
return web.Response(status=204)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_mkcol(self, request: web.Request, user: Dict) -> web.Response:
"""Handle MKCOL method"""
path = self.get_physical_path(user['username'], request.path)
if path.exists():
raise web.HTTPMethodNotAllowed(method='MKCOL', allowed_methods=[])
# Check if parent exists
if not path.parent.exists():
raise web.HTTPConflict()
try:
path.mkdir(parents=False, exist_ok=False)
return web.Response(status=201)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_propfind(self, request: web.Request, user: Dict) -> web.Response:
"""Handle PROPFIND method"""
path = self.get_physical_path(user['username'], request.path)
if not path.exists():
raise web.HTTPNotFound()
# Get depth header
depth_header = request.headers.get('Depth', '1')
if depth_header == 'infinity':
depth = Config.MAX_PROPFIND_DEPTH
else:
try:
depth = int(depth_header)
except ValueError:
depth = 1
# Parse request body
body = await request.read()
prop_request, specific_props = WebDAVXML.parse_propfind(body)
# Build multistatus response
multistatus = WebDAVXML.create_multistatus()
# Add resources
await self.add_resource_props(multistatus, path, request.path, user, depth, prop_request)
xml_response = '<?xml version="1.0" encoding="utf-8"?>\n' + WebDAVXML.serialize(multistatus)
return web.Response(
status=207,
content_type='application/xml',
charset='utf-8',
text=xml_response,
headers={
'DAV': '1, 2',
'MS-Author-Via': 'DAV',
}
)
async def add_resource_props(self, multistatus: ET.Element, path: Path,
href: str, user: Dict, depth: int, prop_request: str):
"""Add resource properties to multistatus response"""
# Add current resource
response = WebDAVXML.create_response(quote(href))
props = await self.get_resource_properties(path, user)
WebDAVXML.add_propstat(response, props)
multistatus.append(response)
# Add children if directory and depth > 0
if depth > 0 and path.is_dir():
try:
for child in path.iterdir():
child_href = href.rstrip('/') + '/' + child.name
if child.is_dir():
child_href += '/'
await self.add_resource_props(
multistatus, child, child_href, user, depth - 1, prop_request
)
except PermissionError:
pass
async def get_resource_properties(self, path: Path, user: Dict) -> Dict[str, str]:
"""Get standard WebDAV properties for a resource"""
props = {}
stat = path.stat()
# Resource type - use None for proper XML nesting
if path.is_dir():
props['resourcetype'] = None # Will be handled specially
props['_is_collection'] = True
else:
props['resourcetype'] = None
props['_is_collection'] = False
# Creation date
props['creationdate'] = datetime.fromtimestamp(stat.st_ctime).strftime('%Y-%m-%dT%H:%M:%SZ')
# Last modified
props['getlastmodified'] = datetime.fromtimestamp(stat.st_mtime).strftime('%a, %d %b %Y %H:%M:%S GMT')
# Content length
if path.is_file():
props['getcontentlength'] = str(stat.st_size)
# Content type
if path.is_file():
content_type, _ = mimetypes.guess_type(str(path))
props['getcontenttype'] = content_type or 'application/octet-stream'
# Display name
props['displayname'] = path.name
# Get custom properties from database
custom_props = await self.db.get_properties(str(path))
for prop in custom_props:
# Handle properties with full namespace URIs in the name
prop_name = prop['property_name']
namespace = prop['namespace'] or ''
# Check if property name contains a colon (namespace separator)
# This happens when cadaver stores properties
if ':' in prop_name and not namespace:
# Extract namespace from property name
# e.g., "http://webdav.org/cadaver/custom-properties/:xxx"
# Split only on the LAST colon to get the actual property name
parts = prop_name.rsplit(':', 1)
if len(parts) == 2:
namespace = parts[0] + ':'
prop_name = parts[1]
# Create property key with namespace
if namespace:
# Store as full namespace URI + property name for ElementTree
key = f'{{{namespace}}}{prop_name}'
else:
key = prop_name
props[key] = prop['property_value']
return props
async def handle_proppatch(self, request: web.Request, user: Dict) -> web.Response:
"""Handle PROPPATCH method"""
path = self.get_physical_path(user['username'], request.path)
if not path.exists():
raise web.HTTPNotFound()
# Check if locked
lock = await self.db.get_lock(request.path)
if lock and lock['user_id'] != user['id']:
raise web.HTTPLocked()
# Parse request body
body = await request.read()
try:
root = ET.fromstring(body)
multistatus = WebDAVXML.create_multistatus()
response = WebDAVXML.create_response(quote(request.path))
# Process set operations
for set_elem in root.findall('.//{DAV:}set/{DAV:}prop'):
for prop_elem in set_elem:
namespace = prop_elem.tag.split('}')[0].strip('{') if '}' in prop_elem.tag else ''
prop_name = prop_elem.tag.split('}')[1] if '}' in prop_elem.tag else prop_elem.tag
prop_value = prop_elem.text or ''
await self.db.set_property(str(path), namespace, prop_name, prop_value)
# Process remove operations
for remove_elem in root.findall('.//{DAV:}remove/{DAV:}prop'):
for prop_elem in remove_elem:
namespace = prop_elem.tag.split('}')[0].strip('{') if '}' in prop_elem.tag else ''
prop_name = prop_elem.tag.split('}')[1] if '}' in prop_elem.tag else prop_elem.tag
# Remove property using database method
await self.db.remove_property(str(path), namespace, prop_name)
WebDAVXML.add_propstat(response, {}, '200 OK')
multistatus.append(response)
xml_response = '<?xml version="1.0" encoding="utf-8"?>\n' + WebDAVXML.serialize(multistatus)
return web.Response(
status=207,
content_type='application/xml',
charset='utf-8',
text=xml_response
)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_copy(self, request: web.Request, user: Dict) -> web.Response:
"""Handle COPY method"""
src_path = self.get_physical_path(user['username'], request.path)
if not src_path.exists():
raise web.HTTPNotFound()
# Get destination
destination = request.headers.get('Destination')
if not destination:
raise web.HTTPBadRequest(text="Missing Destination header")
# Parse destination URL
from urllib.parse import urlparse
dest_url = urlparse(destination)
dest_path = self.get_physical_path(user['username'], dest_url.path)
# Check overwrite
overwrite = request.headers.get('Overwrite', 'T') == 'T'
if dest_path.exists() and not overwrite:
raise web.HTTPPreconditionFailed()
try:
import shutil
if src_path.is_dir():
shutil.copytree(src_path, dest_path, dirs_exist_ok=overwrite)
else:
dest_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_path, dest_path)
status = 204 if dest_path.exists() else 201
return web.Response(status=status)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_move(self, request: web.Request, user: Dict) -> web.Response:
"""Handle MOVE method"""
src_path = self.get_physical_path(user['username'], request.path)
if not src_path.exists():
raise web.HTTPNotFound()
# Check if locked
lock = await self.db.get_lock(request.path)
if lock and lock['user_id'] != user['id']:
raise web.HTTPLocked()
# Get destination
destination = request.headers.get('Destination')
if not destination:
raise web.HTTPBadRequest(text="Missing Destination header")
# Parse destination URL
from urllib.parse import urlparse
dest_url = urlparse(destination)
dest_path = self.get_physical_path(user['username'], dest_url.path)
# Check overwrite
overwrite = request.headers.get('Overwrite', 'T') == 'T'
if dest_path.exists() and not overwrite:
raise web.HTTPPreconditionFailed()
try:
dest_path.parent.mkdir(parents=True, exist_ok=True)
src_path.replace(dest_path)
status = 204 if dest_path.exists() else 201
return web.Response(status=status)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_lock(self, request: web.Request, user: Dict) -> web.Response:
"""Handle LOCK method"""
path = self.get_physical_path(user['username'], request.path)
# Parse lock request
body = await request.read()
try:
root = ET.fromstring(body)
# Get lock scope and type
lock_scope = 'exclusive'
if root.find('.//{DAV:}shared') is not None:
lock_scope = 'shared'
lock_type = 'write'
# Get timeout
timeout_header = request.headers.get('Timeout', f'Second-{Config.LOCK_TIMEOUT_DEFAULT}')
timeout = Config.LOCK_TIMEOUT_DEFAULT
if timeout_header.startswith('Second-'):
try:
timeout = int(timeout_header.split('-')[1])
except:
pass
# Get owner info
owner_elem = root.find('.//{DAV:}owner')
owner = ET.tostring(owner_elem, encoding='unicode') if owner_elem is not None else None
# Create lock
lock_token = await self.db.create_lock(
request.path, user['id'], lock_type, lock_scope, 0, timeout, owner
)
# Build lock response
response_xml = f'''<?xml version="1.0" encoding="utf-8"?>
<D:prop xmlns:D="DAV:">
<D:lockdiscovery>
<D:activelock>
<D:locktype><D:write/></D:locktype>
<D:lockscope><D:{lock_scope}/></D:lockscope>
<D:depth>0</D:depth>
<D:timeout>Second-{timeout}</D:timeout>
<D:locktoken>
<D:href>{lock_token}</D:href>
</D:locktoken>
{owner or ''}
</D:activelock>
</D:lockdiscovery>
</D:prop>'''
return web.Response(
status=200,
content_type='application/xml',
charset='utf-8',
text=response_xml,
headers={
'Lock-Token': f'<{lock_token}>',
}
)
except Exception as e:
raise web.HTTPInternalServerError(text=str(e))
async def handle_unlock(self, request: web.Request, user: Dict) -> web.Response:
"""Handle UNLOCK method"""
lock_token = request.headers.get('Lock-Token', '').strip('<>')
if not lock_token:
raise web.HTTPBadRequest(text="Missing Lock-Token header")
removed = await self.db.remove_lock(lock_token, user['id'])
if removed:
return web.Response(status=204)
else:
raise web.HTTPConflict(text="Lock not found or not owned by user")
async def generate_directory_listing(self, path: Path, href: str) -> web.Response:
"""Generate HTML directory listing"""
html = f'''<!DOCTYPE html>
<html>
<head>
<title>Index of {href}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1 {{ border-bottom: 1px solid #ccc; }}
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid #ddd; }}
th {{ background-color: #f2f2f2; }}
a {{ text-decoration: none; color: #0066cc; }}
a:hover {{ text-decoration: underline; }}
</style>
</head>
<body>
<h1>Index of {href}</h1>
<table>
<tr><th>Name</th><th>Size</th><th>Modified</th></tr>
'''
# Add parent directory link
if href != '/':
parent = '/'.join(href.rstrip('/').split('/')[:-1]) or '/'
html += f'<tr><td><a href="{parent}">..</a></td><td>-</td><td>-</td></tr>'
# List directory contents
items = sorted(path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower()))
for item in items:
name = item.name
if item.is_dir():
name += '/'
size = '-' if item.is_dir() else f'{item.stat().st_size:,}'
modified = datetime.fromtimestamp(item.stat().st_mtime).strftime('%Y-%m-%d %H:%M')
item_href = href.rstrip('/') + '/' + item.name
if item.is_dir():
item_href += '/'
html += f'<tr><td><a href="{quote(item_href)}">{name}</a></td><td>{size}</td><td>{modified}</td></tr>'
html += '''
</table>
</body>
</html>'''
return web.Response(text=html, content_type='text/html')
# ============================================================================
# Web Application
# ============================================================================
async def webdav_middleware(app, handler):
"""Middleware to handle authentication and routing"""
async def middleware_handler(request: web.Request):
# Handle OPTIONS without authentication
if request.method == 'OPTIONS':
webdav = app['webdav']
return await webdav.handle_options(request, {})
# Authenticate user
user = await app['auth'].authenticate(request)
if not user:
return app['auth'].require_auth_response()
# Store user in request
request['user'] = user
# Route to appropriate handler
webdav = app['webdav']
if request.method == 'GET':
return await webdav.handle_get(request, user)
elif request.method == 'HEAD':
return await webdav.handle_head(request, user)
elif request.method == 'PUT':
return await webdav.handle_put(request, user)
elif request.method == 'DELETE':
return await webdav.handle_delete(request, user)
elif request.method == 'MKCOL':
return await webdav.handle_mkcol(request, user)
elif request.method == 'PROPFIND':
return await webdav.handle_propfind(request, user)
elif request.method == 'PROPPATCH':
return await webdav.handle_proppatch(request, user)
elif request.method == 'COPY':
return await webdav.handle_copy(request, user)
elif request.method == 'MOVE':
return await webdav.handle_move(request, user)
elif request.method == 'LOCK':
return await webdav.handle_lock(request, user)
elif request.method == 'UNLOCK':
return await webdav.handle_unlock(request, user)
elif request.method == 'OPTIONS':
return await webdav.handle_options(request, user)
else:
raise web.HTTPMethodNotAllowed(method=request.method, allowed_methods=[])
return middleware_handler
async def init_app() -> web.Application:
"""Initialize web application"""
# Create application
app = web.Application(
client_max_size=Config.MAX_FILE_SIZE,
middlewares=[]
)
# Initialize database
db = Database(Config.DB_PATH)
app['db'] = db
# Initialize authentication
auth = AuthHandler(db)
app['auth'] = auth
# Initialize WebDAV handler
webdav = WebDAVHandler(db, auth)
app['webdav'] = webdav
# Setup session
secret_key = Config.JWT_SECRET_KEY.encode()
setup_session(app, EncryptedCookieStorage(secret_key))
# Add middleware
app.middlewares.append(webdav_middleware)
# Add catch-all route for WebDAV
app.router.add_route('*', '/{path:.*}', lambda r: web.Response(status=200))
return app
async def create_default_user(db: Database):
"""Create default admin user if no users exist"""
conn = db.get_connection()
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) as count FROM users')
count = cursor.fetchone()['count']
conn.close()
if count == 0:
print("Creating default admin user...")
await db.create_user('admin', 'admin123', 'admin@webdav.local', 'admin')
print("Default user created: admin / admin123")
print("Please change this password immediately!")
def main(**kwargs):
"""Main entry point"""
# Ensure WebDAV root directory exists
Path(Config.WEBDAV_ROOT).mkdir(parents=True, exist_ok=True)
# Initialize database and create default user
db = Database(Config.DB_PATH)
asyncio.run(create_default_user(db))
# Create and run application
app = asyncio.run(init_app())
print(f"Starting WebDAV Server on {Config.HOST}:{Config.PORT}")
print(f"WebDAV URL: http://{Config.HOST}:{Config.PORT}/")
print(f"Authentication methods: {', '.join(Config.AUTH_METHODS)}")
web.run_app(
app,
host=Config.HOST,
port=Config.PORT,
access_log_format='%a %t "%r" %s %b "%{Referer}i" "%{User-Agent}i"',
**kwargs
)
if __name__ == '__main__':
main()