From 94c5ce49897245fa6f37ff504ecde45aae19358c Mon Sep 17 00:00:00 2001 From: retoor Date: Thu, 10 Jul 2025 03:25:40 +0200 Subject: [PATCH] WebDav properies support. --- src/snek/webdav.py | 565 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 560 insertions(+), 5 deletions(-) mode change 100755 => 100644 src/snek/webdav.py diff --git a/src/snek/webdav.py b/src/snek/webdav.py old mode 100755 new mode 100644 index 4c57fab..e86ea24 --- a/src/snek/webdav.py +++ b/src/snek/webdav.py @@ -4,6 +4,7 @@ import pathlib logging.basicConfig(level=logging.DEBUG) import base64 import datetime +import json import mimetypes import os import shutil @@ -48,6 +49,9 @@ class WebdavApplication(aiohttp.web.Application): self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch) self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock) self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock) + self.router.add_route("PROPGET", "/{filename:.*}", self.handle_propget) + self.router.add_route("PROPSET", "/{filename:.*}", self.handle_propset) + self.router.add_route("PROPDEL", "/{filename:.*}", self.handle_propdel) self.parent = parent @property @@ -118,6 +122,10 @@ class WebdavApplication(aiohttp.web.Application): file_path = request["home"] / request.match_info["filename"] if file_path.is_file(): file_path.unlink() + # Also delete properties file if exists + props_file = self.get_props_file_path(file_path) + if props_file.exists(): + props_file.unlink() return aiohttp.web.Response(status=204) elif file_path.is_dir(): shutil.rmtree(file_path) @@ -147,6 +155,11 @@ class WebdavApplication(aiohttp.web.Application): if not src_path.exists(): return aiohttp.web.Response(status=404, text="Source not found") shutil.move(str(src_path), str(dest_path)) + # Also move properties file if exists + src_props = self.get_props_file_path(src_path) + if src_props.exists(): + dest_props = self.get_props_file_path(dest_path) + shutil.move(str(src_props), str(dest_props)) return aiohttp.web.Response(status=201, text="Moved successfully") async def handle_copy(self, request): @@ -164,12 +177,17 @@ class WebdavApplication(aiohttp.web.Application): shutil.copy2(str(src_path), str(dest_path)) else: shutil.copytree(str(src_path), str(dest_path)) + # Also copy properties file if exists + src_props = self.get_props_file_path(src_path) + if src_props.exists(): + dest_props = self.get_props_file_path(dest_path) + shutil.copy2(str(src_props), str(dest_props)) return aiohttp.web.Response(status=201, text="Copied successfully") async def handle_options(self, request): headers = { "DAV": "1, 2", - "Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH", + "Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH, PROPGET, PROPSET, PROPDEL", } return aiohttp.web.Response(status=200, headers=headers) @@ -258,6 +276,7 @@ class WebdavApplication(aiohttp.web.Application): for item in abs_path.iterdir(): await self.create_node(request, response_xml, item, depth - 1) + async def handle_propfind(self, request): if not await self.authenticate(request): return aiohttp.web.Response( @@ -271,14 +290,48 @@ class WebdavApplication(aiohttp.web.Application): pass requested_path = request.match_info.get("filename", "") - abs_path = request["home"] / requested_path + if not abs_path.exists(): - return aiohttp.web.Response(status=404, text="Directory not found") + return aiohttp.web.Response(status=404, text="Resource not found") + + # Parse request body to determine what properties are requested + body = await request.read() + requested_props = [] + propname_only = False + all_props = True + + if body: + try: + root = etree.fromstring(body) + + # Check for propname request (list property names only) + if root.find(".//{DAV:}propname") is not None: + propname_only = True + all_props = False + + # Check for specific properties request + prop_elem = root.find(".//{DAV:}prop") + if prop_elem is not None and not propname_only: + all_props = False + for child in prop_elem: + requested_props.append((child.tag, child.nsmap)) + + # Check for allprop request + if root.find(".//{DAV:}allprop") is not None: + all_props = True + except: + pass + + # Build response nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) - await self.create_node(request, response_xml, abs_path, depth) + # Create nodes with depth support + await self.create_propfind_node( + request, response_xml, abs_path, depth, + requested_props, propname_only, all_props + ) xml_output = etree.tostring( response_xml, encoding="utf-8", xml_declaration=True @@ -287,12 +340,281 @@ class WebdavApplication(aiohttp.web.Application): status=207, text=xml_output, content_type="application/xml" ) + async def create_propfind_node(self, request, response_xml, full_path, depth, + requested_props, propname_only, all_props): + """Create a PROPFIND response node with custom property support""" + abs_path = pathlib.Path(full_path) + relative_path = str(full_path.relative_to(request["home"])) + + href_path = f"{self.relative_url}/{relative_path}".strip(".") + href_path = href_path.replace("./", "/") + href_path = href_path.replace("//", "/") + + response = etree.SubElement(response_xml, "{DAV:}response") + href = etree.SubElement(response, "{DAV:}href") + href.text = href_path + + # Load custom properties + custom_properties = await self.load_properties(abs_path) + + # Create propstat for found properties + propstat_ok = etree.SubElement(response, "{DAV:}propstat") + prop_ok = etree.SubElement(propstat_ok, "{DAV:}prop") + + # Track what properties were found + found_props = [] + + # Standard DAV properties + standard_props = { + "{DAV:}resourcetype": None, + "{DAV:}creationdate": None, + "{DAV:}getlastmodified": None, + "{DAV:}displayname": full_path.name, + "{DAV:}lockdiscovery": None, + "{DAV:}supportedlock": None, + "{DAV:}quota-used-bytes": None, + "{DAV:}quota-available-bytes": None, + } + + if full_path.is_file(): + mimetype, _ = mimetypes.guess_type(full_path.name) + standard_props["{DAV:}getcontentlength"] = None + standard_props["{DAV:}getcontenttype"] = mimetype + + # If specific properties requested + if not all_props and requested_props: + propstat_notfound = None + prop_notfound = None + + for prop_tag, prop_nsmap in requested_props: + if prop_tag in standard_props: + # Add standard property + await self.add_standard_property(prop_ok, prop_tag, full_path) + found_props.append(prop_tag) + elif prop_tag in custom_properties: + # Add custom property + if propname_only: + etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap) + else: + elem = etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap) + elem.text = str(custom_properties[prop_tag]) + found_props.append(prop_tag) + else: + # Property not found + if propstat_notfound is None: + propstat_notfound = etree.SubElement(response, "{DAV:}propstat") + prop_notfound = etree.SubElement(propstat_notfound, "{DAV:}prop") + etree.SubElement(prop_notfound, prop_tag, nsmap=prop_nsmap) + + if propstat_notfound is not None: + etree.SubElement(propstat_notfound, "{DAV:}status").text = "HTTP/1.1 404 Not Found" + + # If all properties or propname requested + else: + # Add all standard properties + for prop_name in standard_props: + if propname_only: + etree.SubElement(prop_ok, prop_name) + else: + await self.add_standard_property(prop_ok, prop_name, full_path) + + # Add all custom properties + for prop_name, prop_value in custom_properties.items(): + if propname_only: + # Extract namespace from property name + if prop_name.startswith("{"): + ns_end = prop_name.find("}") + ns = prop_name[1:ns_end] + local_name = prop_name[ns_end+1:] + # Create appropriate nsmap for this property + if ns != "DAV:": + prop_nsmap = {"D": "DAV:", None: ns} + else: + prop_nsmap = {"D": "DAV:"} + elem = etree.SubElement(prop_ok, prop_name, nsmap=prop_nsmap) + else: + etree.SubElement(prop_ok, prop_name) + else: + elem = etree.SubElement(prop_ok, prop_name) + elem.text = str(prop_value) + + etree.SubElement(propstat_ok, "{DAV:}status").text = "HTTP/1.1 200 OK" + + # Handle depth for collections + if abs_path.is_dir() and depth > 0: + for item in abs_path.iterdir(): + # Skip hidden property files + if item.name.startswith(".") and item.name.endswith(".webdav_props.json"): + continue + await self.create_propfind_node( + request, response_xml, item, depth - 1, + requested_props, propname_only, all_props + ) + + async def add_standard_property(self, prop_elem, prop_name, full_path): + """Add a standard DAV property to the prop element""" + if prop_name == "{DAV:}resourcetype": + res_type = etree.SubElement(prop_elem, "{DAV:}resourcetype") + if full_path.is_dir(): + etree.SubElement(res_type, "{DAV:}collection") + + elif prop_name == "{DAV:}creationdate": + creation_date, _ = self.get_current_utc_time(full_path) + etree.SubElement(prop_elem, "{DAV:}creationdate").text = creation_date + + elif prop_name == "{DAV:}getlastmodified": + _, last_modified = self.get_current_utc_time(full_path) + etree.SubElement(prop_elem, "{DAV:}getlastmodified").text = last_modified + + elif prop_name == "{DAV:}displayname": + etree.SubElement(prop_elem, "{DAV:}displayname").text = full_path.name + + elif prop_name == "{DAV:}lockdiscovery": + etree.SubElement(prop_elem, "{DAV:}lockdiscovery") + + elif prop_name == "{DAV:}supportedlock": + supported_lock = etree.SubElement(prop_elem, "{DAV:}supportedlock") + lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry") + lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope") + etree.SubElement(lock_scope_1, "{DAV:}exclusive") + lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype") + etree.SubElement(lock_type_1, "{DAV:}write") + lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry") + lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope") + etree.SubElement(lock_scope_2, "{DAV:}shared") + lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype") + etree.SubElement(lock_type_2, "{DAV:}write") + + elif prop_name == "{DAV:}quota-used-bytes": + size = await self.get_file_size(full_path) if full_path.is_file() else await self.get_directory_size(full_path) + etree.SubElement(prop_elem, "{DAV:}quota-used-bytes").text = str(size) + + elif prop_name == "{DAV:}quota-available-bytes": + free_space = await self.get_disk_free_space(str(full_path.parent)) + etree.SubElement(prop_elem, "{DAV:}quota-available-bytes").text = str(free_space) + + elif prop_name == "{DAV:}getcontentlength" and full_path.is_file(): + size = await self.get_file_size(full_path) + etree.SubElement(prop_elem, "{DAV:}getcontentlength").text = str(size) + + elif prop_name == "{DAV:}getcontenttype" and full_path.is_file(): + mimetype, _ = mimetypes.guess_type(full_path.name) + if mimetype: + etree.SubElement(prop_elem, "{DAV:}getcontenttype").text = mimetype + async def handle_proppatch(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) - return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)") + + requested_path = request.match_info.get("filename", "") + abs_path = request["home"] / requested_path + + if not abs_path.exists(): + return aiohttp.web.Response(status=404, text="Resource not found") + + # Parse request body + body = await request.read() + if not body: + return aiohttp.web.Response(status=400, text="Bad Request") + + try: + root = etree.fromstring(body) + + # Load existing properties + properties = await self.load_properties(abs_path) + + # Build response + nsmap = {"D": "DAV:"} + response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) + response = etree.SubElement(response_xml, "{DAV:}response") + + href = etree.SubElement(response, "{DAV:}href") + relative_path = str(abs_path.relative_to(request["home"])) + href_path = f"{self.relative_url}/{relative_path}".strip(".") + href_path = href_path.replace("./", "/").replace("//", "/") + href.text = href_path + + # Process set operations + set_elem = root.find(".//{DAV:}set") + if set_elem is not None: + prop_elem = set_elem.find("{DAV:}prop") + if prop_elem is not None: + propstat = etree.SubElement(response, "{DAV:}propstat") + prop = etree.SubElement(propstat, "{DAV:}prop") + + for child in prop_elem: + # Get namespace and property name + if child.tag.startswith("{"): + ns_end = child.tag.find("}") + ns = child.tag[1:ns_end] + local_name = child.tag[ns_end+1:] + else: + ns = "DAV:" + local_name = child.tag + + prop_name = f"{{{ns}}}{local_name}" + prop_value = child.text or "" + + # Set the property + properties[prop_name] = prop_value + + # Add to response + elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap) + + etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" + + # Process remove operations + remove_elem = root.find(".//{DAV:}remove") + if remove_elem is not None: + prop_elem = remove_elem.find("{DAV:}prop") + if prop_elem is not None: + propstat = etree.SubElement(response, "{DAV:}propstat") + prop = etree.SubElement(propstat, "{DAV:}prop") + + for child in prop_elem: + # Get namespace and property name + if child.tag.startswith("{"): + ns_end = child.tag.find("}") + ns = child.tag[1:ns_end] + local_name = child.tag[ns_end+1:] + else: + ns = "DAV:" + local_name = child.tag + + prop_name = f"{{{ns}}}{local_name}" + + # Remove the property if it exists + if prop_name in properties: + del properties[prop_name] + status = "HTTP/1.1 200 OK" + else: + status = "HTTP/1.1 404 Not Found" + + # Add to response + elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap) + + etree.SubElement(propstat, "{DAV:}status").text = status + + # Save updated properties or delete file if empty + if properties: + await self.save_properties(abs_path, properties) + else: + await self.delete_properties_file(abs_path) + + xml_output = etree.tostring( + response_xml, encoding="utf-8", xml_declaration=True + ).decode() + return aiohttp.web.Response( + status=207, text=xml_output, content_type="application/xml" + ) + + except Exception as e: + # Log the error for debugging + logging.error(f"PROPPATCH error: {e}") + return aiohttp.web.Response(status=400, text="Bad Request") + async def handle_lock(self, request): if not await self.authenticate(request): @@ -375,3 +697,236 @@ class WebdavApplication(aiohttp.web.Application): } return aiohttp.web.Response(status=200, headers=headers) + + def get_props_file_path(self, resource_path): + """Get the path to the properties file for a resource""" + if resource_path.is_dir(): + return resource_path / ".webdav_props.json" + else: + return resource_path.parent / f".{resource_path.name}.webdav_props.json" + + async def load_properties(self, resource_path): + """Load custom properties for a resource""" + props_file = self.get_props_file_path(resource_path) + print(props_file) + if props_file.exists(): + async with aiofiles.open(props_file, "r") as f: + content = await f.read() + return json.loads(content) + return {} + + async def save_properties(self, resource_path, properties): + """Save custom properties for a resource""" + props_file = self.get_props_file_path(resource_path) + print(props_file) + async with aiofiles.open(props_file, "w") as f: + await f.write(json.dumps(properties, indent=2)) + + async def delete_properties_file(self, resource_path): + """Delete the properties file for a resource""" + props_file = self.get_props_file_path(resource_path) + print(props_file) + if props_file.exists(): + props_file.unlink() + + async def handle_propget(self, request): + + if not await self.authenticate(request): + return aiohttp.web.Response( + status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} + ) + + requested_path = request.match_info.get("filename", "") + abs_path = request["home"] / requested_path + + if not abs_path.exists(): + return aiohttp.web.Response(status=404, text="Resource not found") + + # Load properties + properties = await self.load_properties(abs_path) + + # Parse request body to get requested properties + body = await request.read() + requested_props = [] + + if body: + try: + root = etree.fromstring(body) + prop_elem = root.find(".//{DAV:}prop") + if prop_elem is not None: + for child in prop_elem: + ns = child.nsmap.get(None, "DAV:") + requested_props.append(f"{{{ns}}}{child.tag.split('}')[-1]}") + except: + pass + + # Build response + nsmap = {"D": "DAV:"} + response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) + response = etree.SubElement(response_xml, "{DAV:}response") + + href = etree.SubElement(response, "{DAV:}href") + relative_path = str(abs_path.relative_to(request["home"])) + href_path = f"{self.relative_url}/{relative_path}".strip(".") + href_path = href_path.replace("./", "/").replace("//", "/") + href.text = href_path + + propstat = etree.SubElement(response, "{DAV:}propstat") + prop = etree.SubElement(propstat, "{DAV:}prop") + + # Add requested properties or all properties + if requested_props: + for prop_name in requested_props: + if prop_name in properties: + elem = etree.SubElement(prop, prop_name) + elem.text = str(properties[prop_name]) + else: + for prop_name, prop_value in properties.items(): + elem = etree.SubElement(prop, prop_name) + elem.text = str(prop_value) + + etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" + + xml_output = etree.tostring( + response_xml, encoding="utf-8", xml_declaration=True + ).decode() + return aiohttp.web.Response( + status=207, text=xml_output, content_type="application/xml" + ) + + async def handle_propset(self, request): + if not await self.authenticate(request): + return aiohttp.web.Response( + status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} + ) + + requested_path = request.match_info.get("filename", "") + abs_path = request["home"] / requested_path + + if not abs_path.exists(): + return aiohttp.web.Response(status=404, text="Resource not found") + + # Parse request body + body = await request.read() + if not body: + return aiohttp.web.Response(status=400, text="Bad Request") + + try: + root = etree.fromstring(body) + prop_elem = root.find(".//{DAV:}prop") + if prop_elem is None: + return aiohttp.web.Response(status=400, text="Bad Request") + + # Load existing properties + properties = await self.load_properties(abs_path) + + # Update properties + for child in prop_elem: + ns = child.nsmap.get(None, "DAV:") + prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" + prop_value = child.text or "" + properties[prop_name] = prop_value + + # Save updated properties + await self.save_properties(abs_path, properties) + + # Build response + nsmap = {"D": "DAV:"} + response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) + response = etree.SubElement(response_xml, "{DAV:}response") + + href = etree.SubElement(response, "{DAV:}href") + relative_path = str(abs_path.relative_to(request["home"])) + href_path = f"{self.relative_url}/{relative_path}".strip(".") + href_path = href_path.replace("./", "/").replace("//", "/") + href.text = href_path + + propstat = etree.SubElement(response, "{DAV:}propstat") + prop = etree.SubElement(propstat, "{DAV:}prop") + + for child in prop_elem: + ns = child.nsmap.get(None, "DAV:") + prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" + elem = etree.SubElement(prop, prop_name) + + etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" + + xml_output = etree.tostring( + response_xml, encoding="utf-8", xml_declaration=True + ).decode() + return aiohttp.web.Response( + status=207, text=xml_output, content_type="application/xml" + ) + except Exception as e: + return aiohttp.web.Response(status=400, text="Bad Request") + + async def handle_propdel(self, request): + if not await self.authenticate(request): + return aiohttp.web.Response( + status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} + ) + + requested_path = request.match_info.get("filename", "") + abs_path = request["home"] / requested_path + + if not abs_path.exists(): + return aiohttp.web.Response(status=404, text="Resource not found") + + # Parse request body + body = await request.read() + if not body: + # If no body, delete all properties + await self.delete_properties_file(abs_path) + return aiohttp.web.Response(status=204) + + try: + root = etree.fromstring(body) + prop_elem = root.find(".//{DAV:}prop") + if prop_elem is None: + return aiohttp.web.Response(status=400, text="Bad Request") + + # Load existing properties + properties = await self.load_properties(abs_path) + + # Delete specified properties + for child in prop_elem: + ns = child.nsmap.get(None, "DAV:") + prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" + if prop_name in properties: + del properties[prop_name] + + # Save updated properties or delete file if empty + if properties: + await self.save_properties(abs_path, properties) + else: + await self.delete_properties_file(abs_path) + + # Build response + nsmap = {"D": "DAV:"} + response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) + response = etree.SubElement(response_xml, "{DAV:}response") + + href = etree.SubElement(response, "{DAV:}href") + relative_path = str(abs_path.relative_to(request["home"])) + href_path = f"{self.relative_url}/{relative_path}".strip(".") + href_path = href_path.replace("./", "/").replace("//", "/") + href.text = href_path + + propstat = etree.SubElement(response, "{DAV:}propstat") + prop = etree.SubElement(propstat, "{DAV:}prop") + + for child in prop_elem: + ns = child.nsmap.get(None, "DAV:") + prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" + elem = etree.SubElement(prop, prop_name) + + etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" + + xml_output = etree.tostring( + response_xml, encoding="utf-8", xml_declaration=True + ).decode() + return aiohttp.web.Response( + status=207, text=xml_output, content_type="application/xml" + ) + except Exception as e: + return aiohttp.web.Response(status=400, text="Bad Request")