WEebdav.
This commit is contained in:
parent
29139d5d0c
commit
9a923f6bdd
src/snek
3
src/snek/__init__.py
Normal file
3
src/snek/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
|
||||
|
||||
|
5
src/snek/__main__.py
Normal file
5
src/snek/__main__.py
Normal file
@ -0,0 +1,5 @@
|
||||
from aiohttp import web
|
||||
from snek.app import Application
|
||||
|
||||
if __name__ == '__main__':
|
||||
web.run_app(Application(), port=8081,host='0.0.0.0')
|
348
src/snek/webdav.py
Executable file
348
src/snek/webdav.py
Executable file
@ -0,0 +1,348 @@
|
||||
import logging
|
||||
|
||||
import pathlib
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import datetime
|
||||
import mimetypes
|
||||
import os
|
||||
import shutil
|
||||
import uuid
|
||||
|
||||
import aiofiles
|
||||
import aiohttp
|
||||
import aiohttp.web
|
||||
from lxml import etree
|
||||
|
||||
|
||||
class WebdavApplication(aiohttp.web.Application):
|
||||
def __init__(self, parent, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.locks = {}
|
||||
|
||||
self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options)
|
||||
self.router.add_route("GET", "/{filename:.*}", self.handle_get)
|
||||
self.router.add_route("PUT", "/{filename:.*}", self.handle_put)
|
||||
self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete)
|
||||
self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol)
|
||||
self.router.add_route("MOVE", "/{filename:.*}", self.handle_move)
|
||||
self.router.add_route("COPY", "/{filename:.*}", self.handle_copy)
|
||||
self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind)
|
||||
self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
|
||||
self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
|
||||
self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
|
||||
self.parent = parent
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
return self.parent.db
|
||||
|
||||
@property
|
||||
def services(self):
|
||||
return self.parent.services
|
||||
|
||||
|
||||
async def authenticate(self, request):
|
||||
session = request.session
|
||||
if session.get('uid'):
|
||||
request['user'] = await self.services.user.get(uid=session['uid'])
|
||||
try:
|
||||
request['home'] = await self.services.user.get_home_folder(user_uid=request['user']['uid'])
|
||||
except:
|
||||
pass
|
||||
return user
|
||||
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if not auth_header.startswith("Basic "):
|
||||
return False
|
||||
encoded_creds = auth_header.split("Basic ")[1]
|
||||
decoded_creds = base64.b64decode(encoded_creds).decode()
|
||||
username, password = decoded_creds.split(":", 1)
|
||||
request['user'] = await self.services.user.authenticate(username=username, password=password)
|
||||
try:
|
||||
request['home'] = await self.services.user.get_home_folder(request['user']['uid'])
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
pass
|
||||
return request['user']
|
||||
|
||||
async def handle_get(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
|
||||
requested_path = request.match_info.get("filename", "")
|
||||
abs_path = request['home'] / requested_path
|
||||
|
||||
if not abs_path.exists():
|
||||
return aiohttp.web.Response(status=404, text="File not found")
|
||||
|
||||
if abs_path.is_dir():
|
||||
return aiohttp.web.Response(status=403, text="Cannot download a directory")
|
||||
|
||||
content_type, _ = mimetypes.guess_type(str(abs_path))
|
||||
content_type = content_type or "application/octet-stream"
|
||||
|
||||
return aiohttp.web.FileResponse(
|
||||
path=str(abs_path), headers={"Content-Type": content_type}, chunk_size=8192
|
||||
)
|
||||
|
||||
async def handle_put(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
file_path = request['home'] / request.match_info["filename"]
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with aiofiles.open(file_path, "wb") as f:
|
||||
while chunk := await request.content.read(1024):
|
||||
await f.write(chunk)
|
||||
return aiohttp.web.Response(status=201, text="File uploaded")
|
||||
|
||||
async def handle_delete(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
file_path = request['home'] / request.match_info["filename"]
|
||||
if file_path.is_file():
|
||||
file_path.unlink()
|
||||
return aiohttp.web.Response(status=204)
|
||||
elif file_path.is_dir():
|
||||
shutil.rmtree(file_path)
|
||||
return aiohttp.web.Response(status=204)
|
||||
return aiohttp.web.Response(status=404, text="Not found")
|
||||
|
||||
async def handle_mkcol(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
dir_path = request['home'] / request.match_info["filename"]
|
||||
if dir_path.exists():
|
||||
return aiohttp.web.Response(status=405, text="Directory already exists")
|
||||
dir_path.mkdir(parents=True, exist_ok=True)
|
||||
return aiohttp.web.Response(status=201, text="Directory created")
|
||||
|
||||
async def handle_move(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
src_path = request['home'] / request.match_info["filename"]
|
||||
dest_path = request['home'] / request.headers.get("Destination", "").replace(
|
||||
"http://localhost:8080/", ""
|
||||
)
|
||||
if not src_path.exists():
|
||||
return aiohttp.web.Response(status=404, text="Source not found")
|
||||
shutil.move(str(src_path), str(dest_path))
|
||||
return aiohttp.web.Response(status=201, text="Moved successfully")
|
||||
|
||||
async def handle_copy(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
src_path = request['home'] / request.match_info["filename"]
|
||||
dest_path = request['home'] / request.headers.get("Destination", "").replace(
|
||||
"http://localhost:8080/", ""
|
||||
)
|
||||
if not src_path.exists():
|
||||
return aiohttp.web.Response(status=404, text="Source not found")
|
||||
if src_path.is_file():
|
||||
shutil.copy2(str(src_path), str(dest_path))
|
||||
else:
|
||||
shutil.copytree(str(src_path), str(dest_path))
|
||||
return aiohttp.web.Response(status=201, text="Copied successfully")
|
||||
|
||||
async def handle_options(self, request):
|
||||
headers = {
|
||||
"DAV": "1, 2",
|
||||
"Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH",
|
||||
}
|
||||
print("RETURN")
|
||||
return aiohttp.web.Response(status=200, headers=headers)
|
||||
|
||||
def get_current_utc_time(self, filepath):
|
||||
if filepath.exists():
|
||||
modified_time = datetime.datetime.utcfromtimestamp(filepath.stat().st_mtime)
|
||||
else:
|
||||
modified_time = datetime.datetime.utcnow()
|
||||
return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime(
|
||||
"%a, %d %b %Y %H:%M:%S GMT"
|
||||
)
|
||||
|
||||
def get_directory_size(self, directory):
|
||||
total_size = 0
|
||||
for dirpath, _, filenames in os.walk(directory):
|
||||
for f in filenames:
|
||||
fp = pathlib.Path(dirpath) / f
|
||||
if fp.exists():
|
||||
total_size += fp.stat().st_size
|
||||
return total_size
|
||||
|
||||
def get_disk_free_space(self, path):
|
||||
statvfs = os.statvfs(path)
|
||||
return statvfs.f_bavail * statvfs.f_frsize
|
||||
|
||||
async def handle_propfind(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
requested_path = request.match_info.get("filename", "")
|
||||
abs_path = request['home'] / requested_path
|
||||
if not abs_path.exists():
|
||||
return aiohttp.web.Response(status=404, text="Directory not found")
|
||||
nsmap = {"D": "DAV:"}
|
||||
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
|
||||
directories = [requested_path]
|
||||
if abs_path.is_dir():
|
||||
directories.extend(os.listdir(abs_path))
|
||||
for item in directories:
|
||||
full_path = abs_path / item if item != requested_path else abs_path
|
||||
href_path = f"/{requested_path}/{item}/" if item != requested_path else f"/{requested_path}/"
|
||||
href_path = href_path.replace("//", "/")
|
||||
response = etree.SubElement(response_xml, "{DAV:}response")
|
||||
href = etree.SubElement(response, "{DAV:}href")
|
||||
if not full_path.is_dir():
|
||||
href_path = href_path.rstrip("/")
|
||||
href.text = href_path
|
||||
propstat = etree.SubElement(response, "{DAV:}propstat")
|
||||
prop = etree.SubElement(propstat, "{DAV:}prop")
|
||||
res_type = etree.SubElement(prop, "{DAV:}resourcetype")
|
||||
if full_path.is_dir():
|
||||
etree.SubElement(res_type, "{DAV:}collection")
|
||||
creation_date, last_modified = self.get_current_utc_time(full_path)
|
||||
etree.SubElement(prop, "{DAV:}creationdate").text = creation_date
|
||||
etree.SubElement(prop, "{DAV:}quota-used-bytes").text = str(
|
||||
full_path.stat().st_size
|
||||
if full_path.is_file()
|
||||
else self.get_directory_size(full_path)
|
||||
)
|
||||
etree.SubElement(prop, "{DAV:}quota-available-bytes").text = str(
|
||||
self.get_disk_free_space(request['home'])
|
||||
)
|
||||
etree.SubElement(prop, "{DAV:}getlastmodified").text = last_modified
|
||||
etree.SubElement(prop, "{DAV:}displayname").text = full_path.name
|
||||
etree.SubElement(prop, "{DAV:}lockdiscovery")
|
||||
mimetype, _ = mimetypes.guess_type(full_path.name)
|
||||
etree.SubElement(prop, "{DAV:}contenttype").text = mimetype
|
||||
etree.SubElement(prop, "{DAV:}getcontentlength").text = str(
|
||||
full_path.stat().st_size
|
||||
if full_path.is_file()
|
||||
else self.get_directory_size(full_path)
|
||||
)
|
||||
supported_lock = etree.SubElement(prop, "{DAV:}supportedlock")
|
||||
lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
|
||||
lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
|
||||
etree.SubElement(lock_scope_1, "{DAV:}exclusive")
|
||||
lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
|
||||
etree.SubElement(lock_type_1, "{DAV:}write")
|
||||
lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
|
||||
lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
|
||||
etree.SubElement(lock_scope_2, "{DAV:}shared")
|
||||
lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
|
||||
etree.SubElement(lock_type_2, "{DAV:}write")
|
||||
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
|
||||
xml_output = etree.tostring(
|
||||
response_xml, encoding="utf-8", xml_declaration=True
|
||||
).decode()
|
||||
return aiohttp.web.Response(
|
||||
status=207, text=xml_output, content_type="application/xml"
|
||||
)
|
||||
|
||||
async def handle_proppatch(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)")
|
||||
|
||||
async def handle_lock(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
resource = request.match_info.get("filename", "/")
|
||||
lock_id = str(uuid.uuid4())
|
||||
self.locks[resource] = lock_id
|
||||
xml_response = self.generate_lock_response(lock_id)
|
||||
headers = {
|
||||
"Lock-Token": f"opaquelocktoken:{lock_id}",
|
||||
"Content-Type": "application/xml",
|
||||
}
|
||||
return aiohttp.web.Response(text=xml_response, headers=headers, status=200)
|
||||
|
||||
async def handle_unlock(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
resource = request.match_info.get("filename", "/")
|
||||
lock_token = request.headers.get("Lock-Token", "").replace(
|
||||
"opaquelocktoken:", ""
|
||||
)
|
||||
if self.locks.get(resource) == lock_token:
|
||||
del self.locks[resource]
|
||||
return aiohttp.web.Response(status=204)
|
||||
return aiohttp.web.Response(status=400, text="Invalid Lock Token")
|
||||
|
||||
def generate_lock_response(self, lock_id):
|
||||
nsmap = {"D": "DAV:"}
|
||||
root = etree.Element("{DAV:}prop", nsmap=nsmap)
|
||||
lock_discovery = etree.SubElement(root, "{DAV:}lockdiscovery")
|
||||
active_lock = etree.SubElement(lock_discovery, "{DAV:}activelock")
|
||||
lock_type = etree.SubElement(active_lock, "{DAV:}locktype")
|
||||
etree.SubElement(lock_type, "{DAV:}write")
|
||||
lock_scope = etree.SubElement(active_lock, "{DAV:}lockscope")
|
||||
etree.SubElement(lock_scope, "{DAV:}exclusive")
|
||||
etree.SubElement(active_lock, "{DAV:}depth").text = "Infinity"
|
||||
owner = etree.SubElement(active_lock, "{DAV:}owner")
|
||||
etree.SubElement(owner, "{DAV:}href").text = lock_id
|
||||
etree.SubElement(active_lock, "{DAV:}timeout").text = "Infinite"
|
||||
lock_token = etree.SubElement(active_lock, "{DAV:}locktoken")
|
||||
etree.SubElement(lock_token, "{DAV:}href").text = f"opaquelocktoken:{lock_id}"
|
||||
return etree.tostring(root, pretty_print=True, encoding="utf-8").decode()
|
||||
|
||||
def get_last_modified(self, path):
|
||||
if not path.exists():
|
||||
return None
|
||||
timestamp = path.stat().st_mtime
|
||||
dt = datetime.datetime.utcfromtimestamp(timestamp)
|
||||
return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
|
||||
async def handle_head(self, request):
|
||||
if not await self.authenticate(request):
|
||||
return aiohttp.web.Response(
|
||||
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
|
||||
)
|
||||
|
||||
requested_path = request.match_info.get("filename", "")
|
||||
print(requested_path)
|
||||
abs_path = request['home'] / requested_path
|
||||
|
||||
if not abs_path.exists():
|
||||
return aiohttp.web.Response(status=404, text="File not found")
|
||||
|
||||
if abs_path.is_dir():
|
||||
return aiohttp.web.Response(
|
||||
status=403, text="Cannot get metadata for a directory"
|
||||
)
|
||||
|
||||
content_type, _ = mimetypes.guess_type(str(abs_path))
|
||||
content_type = content_type or "application/octet-stream"
|
||||
file_size = abs_path.stat().st_size
|
||||
|
||||
headers = {
|
||||
"Content-Type": content_type,
|
||||
"Content-Length": str(file_size),
|
||||
"Last-Modified": self.get_last_modified(abs_path),
|
||||
}
|
||||
|
||||
return aiohttp.web.Response(status=200, headers=headers)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user