From 9a923f6bddd73df27af80ef6c8e2313816a07a48 Mon Sep 17 00:00:00 2001
From: retoor <retoor@molodetz.nl>
Date: Sat, 29 Mar 2025 07:15:53 +0100
Subject: [PATCH] WEebdav.

---
 src/snek/__init__.py |   3 +
 src/snek/__main__.py |   5 +
 src/snek/webdav.py   | 348 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 356 insertions(+)
 create mode 100644 src/snek/__init__.py
 create mode 100644 src/snek/__main__.py
 create mode 100755 src/snek/webdav.py

diff --git a/src/snek/__init__.py b/src/snek/__init__.py
new file mode 100644
index 0000000..b28b04f
--- /dev/null
+++ b/src/snek/__init__.py
@@ -0,0 +1,3 @@
+
+
+
diff --git a/src/snek/__main__.py b/src/snek/__main__.py
new file mode 100644
index 0000000..30f0209
--- /dev/null
+++ b/src/snek/__main__.py
@@ -0,0 +1,5 @@
+from aiohttp import web 
+from snek.app import Application
+
+if __name__ == '__main__':
+    web.run_app(Application(), port=8081,host='0.0.0.0')
diff --git a/src/snek/webdav.py b/src/snek/webdav.py
new file mode 100755
index 0000000..66aeb3d
--- /dev/null
+++ b/src/snek/webdav.py
@@ -0,0 +1,348 @@
+import logging
+
+import pathlib
+logging.basicConfig(level=logging.DEBUG)
+
+import asyncio
+import base64
+import datetime
+import mimetypes
+import os
+import shutil
+import uuid
+
+import aiofiles
+import aiohttp
+import aiohttp.web
+from lxml import etree
+
+
+class WebdavApplication(aiohttp.web.Application):
+    def __init__(self, parent, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.locks = {}
+        
+        self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options)
+        self.router.add_route("GET", "/{filename:.*}", self.handle_get)
+        self.router.add_route("PUT", "/{filename:.*}", self.handle_put)
+        self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete)
+        self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol)
+        self.router.add_route("MOVE", "/{filename:.*}", self.handle_move)
+        self.router.add_route("COPY", "/{filename:.*}", self.handle_copy)
+        self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind)
+        self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
+        self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
+        self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
+        self.parent = parent
+
+    @property 
+    def db(self):
+        return self.parent.db
+
+    @property 
+    def services(self):
+        return self.parent.services 
+
+
+    async def authenticate(self, request):
+        session = request.session
+        if session.get('uid'):
+            request['user'] = await self.services.user.get(uid=session['uid'])
+            try:
+                request['home'] = await self.services.user.get_home_folder(user_uid=request['user']['uid'])
+            except:
+                pass
+            return user
+
+        auth_header = request.headers.get("Authorization", "")
+        if not auth_header.startswith("Basic "):
+            return False
+        encoded_creds = auth_header.split("Basic ")[1]
+        decoded_creds = base64.b64decode(encoded_creds).decode()
+        username, password = decoded_creds.split(":", 1)
+        request['user'] = await self.services.user.authenticate(username=username, password=password)
+        try:
+            request['home'] = await self.services.user.get_home_folder(request['user']['uid'])
+        except Exception as ex:
+            print(ex)
+            pass
+        return request['user']
+
+    async def handle_get(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+
+        requested_path = request.match_info.get("filename", "")
+        abs_path = request['home'] / requested_path
+
+        if not abs_path.exists():
+            return aiohttp.web.Response(status=404, text="File not found")
+
+        if abs_path.is_dir():
+            return aiohttp.web.Response(status=403, text="Cannot download a directory")
+
+        content_type, _ = mimetypes.guess_type(str(abs_path))
+        content_type = content_type or "application/octet-stream"
+
+        return aiohttp.web.FileResponse(
+            path=str(abs_path), headers={"Content-Type": content_type}, chunk_size=8192
+        )
+
+    async def handle_put(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        file_path = request['home'] / request.match_info["filename"]
+        file_path.parent.mkdir(parents=True, exist_ok=True)
+        async with aiofiles.open(file_path, "wb") as f:
+            while chunk := await request.content.read(1024):
+                await f.write(chunk)
+        return aiohttp.web.Response(status=201, text="File uploaded")
+
+    async def handle_delete(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        file_path = request['home'] / request.match_info["filename"]
+        if file_path.is_file():
+            file_path.unlink()
+            return aiohttp.web.Response(status=204)
+        elif file_path.is_dir():
+            shutil.rmtree(file_path)
+            return aiohttp.web.Response(status=204)
+        return aiohttp.web.Response(status=404, text="Not found")
+
+    async def handle_mkcol(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        dir_path = request['home'] / request.match_info["filename"]
+        if dir_path.exists():
+            return aiohttp.web.Response(status=405, text="Directory already exists")
+        dir_path.mkdir(parents=True, exist_ok=True)
+        return aiohttp.web.Response(status=201, text="Directory created")
+
+    async def handle_move(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        src_path = request['home'] / request.match_info["filename"]
+        dest_path = request['home'] / request.headers.get("Destination", "").replace(
+            "http://localhost:8080/", ""
+        )
+        if not src_path.exists():
+            return aiohttp.web.Response(status=404, text="Source not found")
+        shutil.move(str(src_path), str(dest_path))
+        return aiohttp.web.Response(status=201, text="Moved successfully")
+
+    async def handle_copy(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        src_path = request['home'] / request.match_info["filename"]
+        dest_path = request['home'] / request.headers.get("Destination", "").replace(
+            "http://localhost:8080/", ""
+        )
+        if not src_path.exists():
+            return aiohttp.web.Response(status=404, text="Source not found")
+        if src_path.is_file():
+            shutil.copy2(str(src_path), str(dest_path))
+        else:
+            shutil.copytree(str(src_path), str(dest_path))
+        return aiohttp.web.Response(status=201, text="Copied successfully")
+
+    async def handle_options(self, request):
+        headers = {
+            "DAV": "1, 2",
+            "Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH",
+        }
+        print("RETURN")
+        return aiohttp.web.Response(status=200, headers=headers)
+
+    def get_current_utc_time(self, filepath):
+        if filepath.exists():
+            modified_time = datetime.datetime.utcfromtimestamp(filepath.stat().st_mtime)
+        else:
+            modified_time = datetime.datetime.utcnow()
+        return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime(
+            "%a, %d %b %Y %H:%M:%S GMT"
+        )
+
+    def get_directory_size(self, directory):
+        total_size = 0
+        for dirpath, _, filenames in os.walk(directory):
+            for f in filenames:
+                fp = pathlib.Path(dirpath) / f
+                if fp.exists():
+                    total_size += fp.stat().st_size
+        return total_size
+
+    def get_disk_free_space(self, path):
+        statvfs = os.statvfs(path)
+        return statvfs.f_bavail * statvfs.f_frsize
+
+    async def handle_propfind(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        requested_path = request.match_info.get("filename", "") 
+        abs_path = request['home'] / requested_path
+        if not abs_path.exists():
+            return aiohttp.web.Response(status=404, text="Directory not found")
+        nsmap = {"D": "DAV:"}
+        response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
+        directories = [requested_path]
+        if abs_path.is_dir():
+            directories.extend(os.listdir(abs_path))
+        for item in directories:
+            full_path = abs_path / item if item != requested_path else abs_path
+            href_path = f"/{requested_path}/{item}/" if item != requested_path else f"/{requested_path}/"
+            href_path = href_path.replace("//", "/")
+            response = etree.SubElement(response_xml, "{DAV:}response")
+            href = etree.SubElement(response, "{DAV:}href")
+            if not full_path.is_dir():
+                href_path = href_path.rstrip("/")
+            href.text = href_path
+            propstat = etree.SubElement(response, "{DAV:}propstat")
+            prop = etree.SubElement(propstat, "{DAV:}prop")
+            res_type = etree.SubElement(prop, "{DAV:}resourcetype")
+            if full_path.is_dir():
+                etree.SubElement(res_type, "{DAV:}collection")
+            creation_date, last_modified = self.get_current_utc_time(full_path)
+            etree.SubElement(prop, "{DAV:}creationdate").text = creation_date
+            etree.SubElement(prop, "{DAV:}quota-used-bytes").text = str(
+                full_path.stat().st_size
+                if full_path.is_file()
+                else self.get_directory_size(full_path)
+            )
+            etree.SubElement(prop, "{DAV:}quota-available-bytes").text = str(
+                self.get_disk_free_space(request['home'])
+            )
+            etree.SubElement(prop, "{DAV:}getlastmodified").text = last_modified
+            etree.SubElement(prop, "{DAV:}displayname").text = full_path.name
+            etree.SubElement(prop, "{DAV:}lockdiscovery")
+            mimetype, _ = mimetypes.guess_type(full_path.name)
+            etree.SubElement(prop, "{DAV:}contenttype").text = mimetype
+            etree.SubElement(prop, "{DAV:}getcontentlength").text = str(
+                full_path.stat().st_size
+                if full_path.is_file()
+                else self.get_directory_size(full_path)
+            )
+            supported_lock = etree.SubElement(prop, "{DAV:}supportedlock")
+            lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
+            lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
+            etree.SubElement(lock_scope_1, "{DAV:}exclusive")
+            lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
+            etree.SubElement(lock_type_1, "{DAV:}write")
+            lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
+            lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
+            etree.SubElement(lock_scope_2, "{DAV:}shared")
+            lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
+            etree.SubElement(lock_type_2, "{DAV:}write")
+            etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
+        xml_output = etree.tostring(
+            response_xml, encoding="utf-8", xml_declaration=True
+        ).decode()
+        return aiohttp.web.Response(
+            status=207, text=xml_output, content_type="application/xml"
+        )
+
+    async def handle_proppatch(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)")
+
+    async def handle_lock(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        resource = request.match_info.get("filename", "/")
+        lock_id = str(uuid.uuid4())
+        self.locks[resource] = lock_id
+        xml_response = self.generate_lock_response(lock_id)
+        headers = {
+            "Lock-Token": f"opaquelocktoken:{lock_id}",
+            "Content-Type": "application/xml",
+        }
+        return aiohttp.web.Response(text=xml_response, headers=headers, status=200)
+
+    async def handle_unlock(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+        resource = request.match_info.get("filename", "/")
+        lock_token = request.headers.get("Lock-Token", "").replace(
+            "opaquelocktoken:", ""
+        )
+        if self.locks.get(resource) == lock_token:
+            del self.locks[resource]
+            return aiohttp.web.Response(status=204)
+        return aiohttp.web.Response(status=400, text="Invalid Lock Token")
+
+    def generate_lock_response(self, lock_id):
+        nsmap = {"D": "DAV:"}
+        root = etree.Element("{DAV:}prop", nsmap=nsmap)
+        lock_discovery = etree.SubElement(root, "{DAV:}lockdiscovery")
+        active_lock = etree.SubElement(lock_discovery, "{DAV:}activelock")
+        lock_type = etree.SubElement(active_lock, "{DAV:}locktype")
+        etree.SubElement(lock_type, "{DAV:}write")
+        lock_scope = etree.SubElement(active_lock, "{DAV:}lockscope")
+        etree.SubElement(lock_scope, "{DAV:}exclusive")
+        etree.SubElement(active_lock, "{DAV:}depth").text = "Infinity"
+        owner = etree.SubElement(active_lock, "{DAV:}owner")
+        etree.SubElement(owner, "{DAV:}href").text = lock_id
+        etree.SubElement(active_lock, "{DAV:}timeout").text = "Infinite"
+        lock_token = etree.SubElement(active_lock, "{DAV:}locktoken")
+        etree.SubElement(lock_token, "{DAV:}href").text = f"opaquelocktoken:{lock_id}"
+        return etree.tostring(root, pretty_print=True, encoding="utf-8").decode()
+
+    def get_last_modified(self, path):
+        if not path.exists():
+            return None
+        timestamp = path.stat().st_mtime
+        dt = datetime.datetime.utcfromtimestamp(timestamp)
+        return dt.strftime("%a, %d %b %Y %H:%M:%S GMT")
+
+    async def handle_head(self, request):
+        if not await self.authenticate(request):
+            return aiohttp.web.Response(
+                status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
+            )
+
+        requested_path = request.match_info.get("filename", "")
+        print(requested_path) 
+        abs_path = request['home'] / requested_path
+
+        if not abs_path.exists():
+            return aiohttp.web.Response(status=404, text="File not found")
+
+        if abs_path.is_dir():
+            return aiohttp.web.Response(
+                status=403, text="Cannot get metadata for a directory"
+            )
+
+        content_type, _ = mimetypes.guess_type(str(abs_path))
+        content_type = content_type or "application/octet-stream"
+        file_size = abs_path.stat().st_size
+
+        headers = {
+            "Content-Type": content_type,
+            "Content-Length": str(file_size),
+            "Last-Modified": self.get_last_modified(abs_path),
+        }
+
+        return aiohttp.web.Response(status=200, headers=headers)
+
+