|
# Written by retoor@molodetz.nl
|
|
|
|
# The script implements a WebDAV server using the aiohttp library. It includes functionalities for authentication, file handling (GET, PUT, DELETE), and folder handling (MKCOL, MOVE, COPY), with support for WebDAV-specific requests like PROPFIND, PROPPATCH, LOCK, and UNLOCK.
|
|
|
|
# External dependencies used include aiohttp for handling HTTP requests, aiofiles for asynchronous file handling, base64 for encoding, lxml for XML handling, and others like mimetypes, uuid, and asyncio.
|
|
|
|
# MIT License
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
import asyncio
|
|
import base64
|
|
import datetime
|
|
import mimetypes
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import uuid
|
|
|
|
import aiofiles
|
|
import aiohttp
|
|
import aiohttp.web
|
|
from lxml import etree
|
|
|
|
WEBROOT = "root"
|
|
USERNAME = "retoor"
|
|
PASSWORD = "retoor"
|
|
NAMESPACE = "DAV:"
|
|
|
|
|
|
class WebDavApplication(aiohttp.web.Application):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.locks = {}
|
|
self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options)
|
|
self.router.add_route("GET", "/{filename:.*}", self.handle_get)
|
|
self.router.add_route("PUT", "/{filename:.*}", self.handle_put)
|
|
self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete)
|
|
self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol)
|
|
self.router.add_route("MOVE", "/{filename:.*}", self.handle_move)
|
|
self.router.add_route("COPY", "/{filename:.*}", self.handle_copy)
|
|
self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind)
|
|
self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
|
|
self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
|
|
self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
|
|
|
|
async def authenticate(self, request):
|
|
auth_header = request.headers.get("Authorization", "")
|
|
if not auth_header.startswith("Basic "):
|
|
return False
|
|
encoded_creds = auth_header.split("Basic ")[1]
|
|
decoded_creds = base64.b64decode(encoded_creds).decode()
|
|
user, pwd = decoded_creds.split(":", 1)
|
|
return user == USERNAME and pwd == PASSWORD
|
|
|
|
async def handle_get(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
|
|
requested_path = request.match_info.get("filename", "")
|
|
abs_path = os.path.join(WEBROOT, requested_path)
|
|
|
|
if not os.path.exists(abs_path):
|
|
return aiohttp.web.Response(status=404, text="File not found")
|
|
|
|
if os.path.isdir(abs_path):
|
|
return aiohttp.web.Response(status=403, text="Cannot download a directory")
|
|
|
|
content_type, _ = mimetypes.guess_type(abs_path)
|
|
content_type = content_type or "application/octet-stream"
|
|
|
|
return aiohttp.web.FileResponse(
|
|
path=abs_path, headers={"Content-Type": content_type}, chunk_size=8192
|
|
)
|
|
|
|
async def handle_put(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
file_path = os.path.join(WEBROOT, request.match_info["filename"])
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
while chunk := await request.content.read(1024):
|
|
await f.write(chunk)
|
|
return aiohttp.web.Response(status=201, text="File uploaded")
|
|
|
|
async def handle_delete(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
file_path = os.path.join(WEBROOT, request.match_info["filename"])
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
return aiohttp.web.Response(status=204)
|
|
elif os.path.isdir(file_path):
|
|
shutil.rmtree(file_path)
|
|
return aiohttp.web.Response(status=204)
|
|
return aiohttp.web.Response(status=404, text="Not found")
|
|
|
|
async def handle_mkcol(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
dir_path = os.path.join(WEBROOT, request.match_info["filename"])
|
|
if os.path.exists(dir_path):
|
|
return aiohttp.web.Response(status=405, text="Directory already exists")
|
|
os.makedirs(dir_path, exist_ok=True)
|
|
return aiohttp.web.Response(status=201, text="Directory created")
|
|
|
|
async def handle_move(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
src_path = os.path.join(WEBROOT, request.match_info["filename"])
|
|
dest_path = os.path.join(
|
|
WEBROOT,
|
|
request.headers.get("Destination", "").replace(
|
|
"http://localhost:8080/", ""
|
|
),
|
|
)
|
|
if not os.path.exists(src_path):
|
|
return aiohttp.web.Response(status=404, text="Source not found")
|
|
shutil.move(src_path, dest_path)
|
|
return aiohttp.web.Response(status=201, text="Moved successfully")
|
|
|
|
async def handle_copy(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
src_path = os.path.join(WEBROOT, request.match_info["filename"])
|
|
dest_path = os.path.join(
|
|
WEBROOT,
|
|
request.headers.get("Destination", "").replace(
|
|
"http://localhost:8080/", ""
|
|
),
|
|
)
|
|
if not os.path.exists(src_path):
|
|
return aiohttp.web.Response(status=404, text="Source not found")
|
|
if os.path.isfile(src_path):
|
|
shutil.copy2(src_path, dest_path)
|
|
else:
|
|
shutil.copytree(src_path, dest_path)
|
|
return aiohttp.web.Response(status=201, text="Copied successfully")
|
|
|
|
async def handle_options(self, request):
|
|
headers = {
|
|
"DAV": "1, 2",
|
|
"Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH",
|
|
}
|
|
return aiohttp.web.Response(status=200, headers=headers)
|
|
|
|
def get_current_utc_time(self, filepath):
|
|
if os.path.exists(filepath):
|
|
modified_time = datetime.datetime.utcfromtimestamp(
|
|
os.path.getmtime(filepath)
|
|
)
|
|
else:
|
|
modified_time = datetime.datetime.utcnow()
|
|
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime(
|
|
"%a, %d %b %Y %H:%M:%S GMT"
|
|
)
|
|
|
|
def get_directory_size(self, directory):
|
|
total_size = 0
|
|
for dirpath, _, filenames in os.walk(directory):
|
|
for f in filenames:
|
|
fp = os.path.join(dirpath, f)
|
|
if os.path.exists(fp):
|
|
total_size += os.path.getsize(fp)
|
|
return total_size
|
|
|
|
def get_disk_free_space(self, path):
|
|
statvfs = os.statvfs(path)
|
|
return statvfs.f_bavail * statvfs.f_frsize
|
|
|
|
async def handle_propfind(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
requested_path = request.match_info.get("filename", "")
|
|
abs_path = os.path.join(WEBROOT, requested_path)
|
|
if not os.path.exists(abs_path):
|
|
return aiohttp.web.Response(status=404, text="Directory not found")
|
|
nsmap = {"D": "DAV:"}
|
|
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
|
|
directories = [requested_path]
|
|
if os.path.isdir(abs_path):
|
|
directories.extend(os.listdir(abs_path))
|
|
for item in directories:
|
|
full_path = (
|
|
os.path.join(abs_path, item) if item != requested_path else abs_path
|
|
)
|
|
href_path = (
|
|
f"/{requested_path}/{item}/"
|
|
if item != requested_path
|
|
else f"/{requested_path}/"
|
|
)
|
|
href_path = href_path.replace("//", "/")
|
|
response = etree.SubElement(response_xml, "{DAV:}response")
|
|
href = etree.SubElement(response, "{DAV:}href")
|
|
if not os.path.isdir(full_path):
|
|
href_path = href_path.rstrip("/")
|
|
href.text = href_path
|
|
propstat = etree.SubElement(response, "{DAV:}propstat")
|
|
prop = etree.SubElement(propstat, "{DAV:}prop")
|
|
res_type = etree.SubElement(prop, "{DAV:}resourcetype")
|
|
if os.path.isdir(full_path):
|
|
etree.SubElement(res_type, "{DAV:}collection")
|
|
creation_date, last_modified = self.get_current_utc_time(full_path)
|
|
etree.SubElement(prop, "{DAV:}creationdate").text = creation_date
|
|
etree.SubElement(prop, "{DAV:}quota-used-bytes").text = str(
|
|
os.path.getsize(full_path)
|
|
if os.path.isfile(full_path)
|
|
else self.get_directory_size(full_path)
|
|
)
|
|
etree.SubElement(prop, "{DAV:}quota-available-bytes").text = str(
|
|
self.get_disk_free_space(WEBROOT)
|
|
)
|
|
etree.SubElement(prop, "{DAV:}getlastmodified").text = last_modified
|
|
etree.SubElement(prop, "{DAV:}displayname").text = os.path.basename(
|
|
full_path
|
|
)
|
|
etree.SubElement(prop, "{DAV:}lockdiscovery")
|
|
mimetype, _ = mimetypes.guess_type(os.path.basename(full_path))
|
|
etree.SubElement(prop, "{DAV:}contenttype").text = mimetype
|
|
etree.SubElement(prop, "{DAV:}getcontentlength").text = str(
|
|
os.path.getsize(full_path)
|
|
if os.path.isfile(full_path)
|
|
else self.get_directory_size(full_path)
|
|
)
|
|
supported_lock = etree.SubElement(prop, "{DAV:}supportedlock")
|
|
lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
|
|
lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
|
|
etree.SubElement(lock_scope_1, "{DAV:}exclusive")
|
|
lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
|
|
etree.SubElement(lock_type_1, "{DAV:}write")
|
|
lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
|
|
lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
|
|
etree.SubElement(lock_scope_2, "{DAV:}shared")
|
|
lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
|
|
etree.SubElement(lock_type_2, "{DAV:}write")
|
|
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
|
|
xml_output = etree.tostring(
|
|
response_xml, encoding="utf-8", xml_declaration=True
|
|
).decode()
|
|
return aiohttp.web.Response(
|
|
status=207, text=xml_output, content_type="application/xml"
|
|
)
|
|
|
|
async def handle_proppatch(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)")
|
|
|
|
async def handle_lock(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
resource = request.match_info.get("filename", "/")
|
|
lock_id = str(uuid.uuid4())
|
|
self.locks[resource] = lock_id
|
|
xml_response = self.generate_lock_response(lock_id)
|
|
headers = {
|
|
"Lock-Token": f"opaquelocktoken:{lock_id}",
|
|
"Content-Type": "application/xml",
|
|
}
|
|
return aiohttp.web.Response(text=xml_response, headers=headers, status=200)
|
|
|
|
async def handle_unlock(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
resource = request.match_info.get("filename", "/")
|
|
lock_token = request.headers.get("Lock-Token", "").replace(
|
|
"opaquelocktoken:", ""
|
|
)
|
|
if self.locks.get(resource) == lock_token:
|
|
del self.locks[resource]
|
|
return aiohttp.web.Response(status=204)
|
|
return aiohttp.web.Response(status=400, text="Invalid Lock Token")
|
|
|
|
def generate_lock_response(self, lock_id):
|
|
nsmap = {"D": "DAV:"}
|
|
root = etree.Element("{DAV:}prop", nsmap=nsmap)
|
|
lock_discovery = etree.SubElement(root, "{DAV:}lockdiscovery")
|
|
active_lock = etree.SubElement(lock_discovery, "{DAV:}activelock")
|
|
lock_type = etree.SubElement(active_lock, "{DAV:}locktype")
|
|
etree.SubElement(lock_type, "{DAV:}write")
|
|
lock_scope = etree.SubElement(active_lock, "{DAV:}lockscope")
|
|
etree.SubElement(lock_scope, "{DAV:}exclusive")
|
|
etree.SubElement(active_lock, "{DAV:}depth").text = "Infinity"
|
|
owner = etree.SubElement(active_lock, "{DAV:}owner")
|
|
etree.SubElement(owner, "{DAV:}href").text = lock_id
|
|
etree.SubElement(active_lock, "{DAV:}timeout").text = "Infinite"
|
|
lock_token = etree.SubElement(active_lock, "{DAV:}locktoken")
|
|
etree.SubElement(lock_token, "{DAV:}href").text = f"opaquelocktoken:{lock_id}"
|
|
return etree.tostring(root, pretty_print=True, encoding="utf-8").decode()
|
|
|
|
def get_last_modified(self, path):
|
|
if not os.path.exists(path):
|
|
return None
|
|
timestamp = os.path.getmtime(path)
|
|
dt = datetime.datetime.utcfromtimestamp(timestamp)
|
|
return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
|
|
|
async def handle_head(self, request):
|
|
if not await self.authenticate(request):
|
|
return aiohttp.web.Response(
|
|
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
|
)
|
|
|
|
requested_path = request.match_info.get("filename", "")
|
|
abs_path = os.path.join(WEBROOT, requested_path)
|
|
|
|
if not os.path.exists(abs_path):
|
|
return aiohttp.web.Response(status=404, text="File not found")
|
|
|
|
if os.path.isdir(abs_path):
|
|
return aiohttp.web.Response(
|
|
status=403, text="Cannot get metadata for a directory"
|
|
)
|
|
|
|
content_type, _ = mimetypes.guess_type(abs_path)
|
|
content_type = content_type or "application/octet-stream"
|
|
file_size = os.path.getsize(abs_path)
|
|
|
|
headers = {
|
|
"Content-Type": content_type,
|
|
"Content-Length": str(file_size),
|
|
"Last-Modified": self.get_last_modified(abs_path),
|
|
}
|
|
|
|
return aiohttp.web.Response(status=200, headers=headers)
|
|
|
|
|
|
app = WebDavApplication(debug=True)
|
|
|
|
if __name__ == "__main__":
|
|
pathlib.Path(WEBROOT).mkdir(parents=True, exist_ok=True)
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.set_default_executor(ThreadPoolExecutor(200))
|
|
aiohttp.web.run_app(app, port=8095)
|