WebDav properies support.

This commit is contained in:
retoor 2025-07-10 03:25:40 +02:00
parent 6a05b56481
commit 94c5ce4989

565
src/snek/webdav.py Executable file → Normal file
View File

@ -4,6 +4,7 @@ import pathlib
logging.basicConfig(level=logging.DEBUG)
import base64
import datetime
import json
import mimetypes
import os
import shutil
@ -48,6 +49,9 @@ class WebdavApplication(aiohttp.web.Application):
self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch)
self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock)
self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock)
self.router.add_route("PROPGET", "/{filename:.*}", self.handle_propget)
self.router.add_route("PROPSET", "/{filename:.*}", self.handle_propset)
self.router.add_route("PROPDEL", "/{filename:.*}", self.handle_propdel)
self.parent = parent
@property
@ -118,6 +122,10 @@ class WebdavApplication(aiohttp.web.Application):
file_path = request["home"] / request.match_info["filename"]
if file_path.is_file():
file_path.unlink()
# Also delete properties file if exists
props_file = self.get_props_file_path(file_path)
if props_file.exists():
props_file.unlink()
return aiohttp.web.Response(status=204)
elif file_path.is_dir():
shutil.rmtree(file_path)
@ -147,6 +155,11 @@ class WebdavApplication(aiohttp.web.Application):
if not src_path.exists():
return aiohttp.web.Response(status=404, text="Source not found")
shutil.move(str(src_path), str(dest_path))
# Also move properties file if exists
src_props = self.get_props_file_path(src_path)
if src_props.exists():
dest_props = self.get_props_file_path(dest_path)
shutil.move(str(src_props), str(dest_props))
return aiohttp.web.Response(status=201, text="Moved successfully")
async def handle_copy(self, request):
@ -164,12 +177,17 @@ class WebdavApplication(aiohttp.web.Application):
shutil.copy2(str(src_path), str(dest_path))
else:
shutil.copytree(str(src_path), str(dest_path))
# Also copy properties file if exists
src_props = self.get_props_file_path(src_path)
if src_props.exists():
dest_props = self.get_props_file_path(dest_path)
shutil.copy2(str(src_props), str(dest_props))
return aiohttp.web.Response(status=201, text="Copied successfully")
async def handle_options(self, request):
headers = {
"DAV": "1, 2",
"Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH",
"Allow": "OPTIONS, GET, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH, PROPGET, PROPSET, PROPDEL",
}
return aiohttp.web.Response(status=200, headers=headers)
@ -258,6 +276,7 @@ class WebdavApplication(aiohttp.web.Application):
for item in abs_path.iterdir():
await self.create_node(request, response_xml, item, depth - 1)
async def handle_propfind(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
@ -271,14 +290,48 @@ class WebdavApplication(aiohttp.web.Application):
pass
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Directory not found")
return aiohttp.web.Response(status=404, text="Resource not found")
# Parse request body to determine what properties are requested
body = await request.read()
requested_props = []
propname_only = False
all_props = True
if body:
try:
root = etree.fromstring(body)
# Check for propname request (list property names only)
if root.find(".//{DAV:}propname") is not None:
propname_only = True
all_props = False
# Check for specific properties request
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is not None and not propname_only:
all_props = False
for child in prop_elem:
requested_props.append((child.tag, child.nsmap))
# Check for allprop request
if root.find(".//{DAV:}allprop") is not None:
all_props = True
except:
pass
# Build response
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
await self.create_node(request, response_xml, abs_path, depth)
# Create nodes with depth support
await self.create_propfind_node(
request, response_xml, abs_path, depth,
requested_props, propname_only, all_props
)
xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
@ -287,12 +340,281 @@ class WebdavApplication(aiohttp.web.Application):
status=207, text=xml_output, content_type="application/xml"
)
async def create_propfind_node(self, request, response_xml, full_path, depth,
requested_props, propname_only, all_props):
"""Create a PROPFIND response node with custom property support"""
abs_path = pathlib.Path(full_path)
relative_path = str(full_path.relative_to(request["home"]))
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/")
href_path = href_path.replace("//", "/")
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
href.text = href_path
# Load custom properties
custom_properties = await self.load_properties(abs_path)
# Create propstat for found properties
propstat_ok = etree.SubElement(response, "{DAV:}propstat")
prop_ok = etree.SubElement(propstat_ok, "{DAV:}prop")
# Track what properties were found
found_props = []
# Standard DAV properties
standard_props = {
"{DAV:}resourcetype": None,
"{DAV:}creationdate": None,
"{DAV:}getlastmodified": None,
"{DAV:}displayname": full_path.name,
"{DAV:}lockdiscovery": None,
"{DAV:}supportedlock": None,
"{DAV:}quota-used-bytes": None,
"{DAV:}quota-available-bytes": None,
}
if full_path.is_file():
mimetype, _ = mimetypes.guess_type(full_path.name)
standard_props["{DAV:}getcontentlength"] = None
standard_props["{DAV:}getcontenttype"] = mimetype
# If specific properties requested
if not all_props and requested_props:
propstat_notfound = None
prop_notfound = None
for prop_tag, prop_nsmap in requested_props:
if prop_tag in standard_props:
# Add standard property
await self.add_standard_property(prop_ok, prop_tag, full_path)
found_props.append(prop_tag)
elif prop_tag in custom_properties:
# Add custom property
if propname_only:
etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap)
else:
elem = etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap)
elem.text = str(custom_properties[prop_tag])
found_props.append(prop_tag)
else:
# Property not found
if propstat_notfound is None:
propstat_notfound = etree.SubElement(response, "{DAV:}propstat")
prop_notfound = etree.SubElement(propstat_notfound, "{DAV:}prop")
etree.SubElement(prop_notfound, prop_tag, nsmap=prop_nsmap)
if propstat_notfound is not None:
etree.SubElement(propstat_notfound, "{DAV:}status").text = "HTTP/1.1 404 Not Found"
# If all properties or propname requested
else:
# Add all standard properties
for prop_name in standard_props:
if propname_only:
etree.SubElement(prop_ok, prop_name)
else:
await self.add_standard_property(prop_ok, prop_name, full_path)
# Add all custom properties
for prop_name, prop_value in custom_properties.items():
if propname_only:
# Extract namespace from property name
if prop_name.startswith("{"):
ns_end = prop_name.find("}")
ns = prop_name[1:ns_end]
local_name = prop_name[ns_end+1:]
# Create appropriate nsmap for this property
if ns != "DAV:":
prop_nsmap = {"D": "DAV:", None: ns}
else:
prop_nsmap = {"D": "DAV:"}
elem = etree.SubElement(prop_ok, prop_name, nsmap=prop_nsmap)
else:
etree.SubElement(prop_ok, prop_name)
else:
elem = etree.SubElement(prop_ok, prop_name)
elem.text = str(prop_value)
etree.SubElement(propstat_ok, "{DAV:}status").text = "HTTP/1.1 200 OK"
# Handle depth for collections
if abs_path.is_dir() and depth > 0:
for item in abs_path.iterdir():
# Skip hidden property files
if item.name.startswith(".") and item.name.endswith(".webdav_props.json"):
continue
await self.create_propfind_node(
request, response_xml, item, depth - 1,
requested_props, propname_only, all_props
)
async def add_standard_property(self, prop_elem, prop_name, full_path):
"""Add a standard DAV property to the prop element"""
if prop_name == "{DAV:}resourcetype":
res_type = etree.SubElement(prop_elem, "{DAV:}resourcetype")
if full_path.is_dir():
etree.SubElement(res_type, "{DAV:}collection")
elif prop_name == "{DAV:}creationdate":
creation_date, _ = self.get_current_utc_time(full_path)
etree.SubElement(prop_elem, "{DAV:}creationdate").text = creation_date
elif prop_name == "{DAV:}getlastmodified":
_, last_modified = self.get_current_utc_time(full_path)
etree.SubElement(prop_elem, "{DAV:}getlastmodified").text = last_modified
elif prop_name == "{DAV:}displayname":
etree.SubElement(prop_elem, "{DAV:}displayname").text = full_path.name
elif prop_name == "{DAV:}lockdiscovery":
etree.SubElement(prop_elem, "{DAV:}lockdiscovery")
elif prop_name == "{DAV:}supportedlock":
supported_lock = etree.SubElement(prop_elem, "{DAV:}supportedlock")
lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope")
etree.SubElement(lock_scope_1, "{DAV:}exclusive")
lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype")
etree.SubElement(lock_type_1, "{DAV:}write")
lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry")
lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope")
etree.SubElement(lock_scope_2, "{DAV:}shared")
lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype")
etree.SubElement(lock_type_2, "{DAV:}write")
elif prop_name == "{DAV:}quota-used-bytes":
size = await self.get_file_size(full_path) if full_path.is_file() else await self.get_directory_size(full_path)
etree.SubElement(prop_elem, "{DAV:}quota-used-bytes").text = str(size)
elif prop_name == "{DAV:}quota-available-bytes":
free_space = await self.get_disk_free_space(str(full_path.parent))
etree.SubElement(prop_elem, "{DAV:}quota-available-bytes").text = str(free_space)
elif prop_name == "{DAV:}getcontentlength" and full_path.is_file():
size = await self.get_file_size(full_path)
etree.SubElement(prop_elem, "{DAV:}getcontentlength").text = str(size)
elif prop_name == "{DAV:}getcontenttype" and full_path.is_file():
mimetype, _ = mimetypes.guess_type(full_path.name)
if mimetype:
etree.SubElement(prop_elem, "{DAV:}getcontenttype").text = mimetype
async def handle_proppatch(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
return aiohttp.web.Response(status=207, text="PROPPATCH OK (Not Implemented)")
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
# Parse request body
body = await request.read()
if not body:
return aiohttp.web.Response(status=400, text="Bad Request")
try:
root = etree.fromstring(body)
# Load existing properties
properties = await self.load_properties(abs_path)
# Build response
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = str(abs_path.relative_to(request["home"]))
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/").replace("//", "/")
href.text = href_path
# Process set operations
set_elem = root.find(".//{DAV:}set")
if set_elem is not None:
prop_elem = set_elem.find("{DAV:}prop")
if prop_elem is not None:
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
# Get namespace and property name
if child.tag.startswith("{"):
ns_end = child.tag.find("}")
ns = child.tag[1:ns_end]
local_name = child.tag[ns_end+1:]
else:
ns = "DAV:"
local_name = child.tag
prop_name = f"{{{ns}}}{local_name}"
prop_value = child.text or ""
# Set the property
properties[prop_name] = prop_value
# Add to response
elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
# Process remove operations
remove_elem = root.find(".//{DAV:}remove")
if remove_elem is not None:
prop_elem = remove_elem.find("{DAV:}prop")
if prop_elem is not None:
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
# Get namespace and property name
if child.tag.startswith("{"):
ns_end = child.tag.find("}")
ns = child.tag[1:ns_end]
local_name = child.tag[ns_end+1:]
else:
ns = "DAV:"
local_name = child.tag
prop_name = f"{{{ns}}}{local_name}"
# Remove the property if it exists
if prop_name in properties:
del properties[prop_name]
status = "HTTP/1.1 200 OK"
else:
status = "HTTP/1.1 404 Not Found"
# Add to response
elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap)
etree.SubElement(propstat, "{DAV:}status").text = status
# Save updated properties or delete file if empty
if properties:
await self.save_properties(abs_path, properties)
else:
await self.delete_properties_file(abs_path)
xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
).decode()
return aiohttp.web.Response(
status=207, text=xml_output, content_type="application/xml"
)
except Exception as e:
# Log the error for debugging
logging.error(f"PROPPATCH error: {e}")
return aiohttp.web.Response(status=400, text="Bad Request")
async def handle_lock(self, request):
if not await self.authenticate(request):
@ -375,3 +697,236 @@ class WebdavApplication(aiohttp.web.Application):
}
return aiohttp.web.Response(status=200, headers=headers)
def get_props_file_path(self, resource_path):
"""Get the path to the properties file for a resource"""
if resource_path.is_dir():
return resource_path / ".webdav_props.json"
else:
return resource_path.parent / f".{resource_path.name}.webdav_props.json"
async def load_properties(self, resource_path):
"""Load custom properties for a resource"""
props_file = self.get_props_file_path(resource_path)
print(props_file)
if props_file.exists():
async with aiofiles.open(props_file, "r") as f:
content = await f.read()
return json.loads(content)
return {}
async def save_properties(self, resource_path, properties):
"""Save custom properties for a resource"""
props_file = self.get_props_file_path(resource_path)
print(props_file)
async with aiofiles.open(props_file, "w") as f:
await f.write(json.dumps(properties, indent=2))
async def delete_properties_file(self, resource_path):
"""Delete the properties file for a resource"""
props_file = self.get_props_file_path(resource_path)
print(props_file)
if props_file.exists():
props_file.unlink()
async def handle_propget(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
# Load properties
properties = await self.load_properties(abs_path)
# Parse request body to get requested properties
body = await request.read()
requested_props = []
if body:
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is not None:
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
requested_props.append(f"{{{ns}}}{child.tag.split('}')[-1]}")
except:
pass
# Build response
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = str(abs_path.relative_to(request["home"]))
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
# Add requested properties or all properties
if requested_props:
for prop_name in requested_props:
if prop_name in properties:
elem = etree.SubElement(prop, prop_name)
elem.text = str(properties[prop_name])
else:
for prop_name, prop_value in properties.items():
elem = etree.SubElement(prop, prop_name)
elem.text = str(prop_value)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
).decode()
return aiohttp.web.Response(
status=207, text=xml_output, content_type="application/xml"
)
async def handle_propset(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
# Parse request body
body = await request.read()
if not body:
return aiohttp.web.Response(status=400, text="Bad Request")
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is None:
return aiohttp.web.Response(status=400, text="Bad Request")
# Load existing properties
properties = await self.load_properties(abs_path)
# Update properties
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
prop_value = child.text or ""
properties[prop_name] = prop_value
# Save updated properties
await self.save_properties(abs_path, properties)
# Build response
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = str(abs_path.relative_to(request["home"]))
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
elem = etree.SubElement(prop, prop_name)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
).decode()
return aiohttp.web.Response(
status=207, text=xml_output, content_type="application/xml"
)
except Exception as e:
return aiohttp.web.Response(status=400, text="Bad Request")
async def handle_propdel(self, request):
if not await self.authenticate(request):
return aiohttp.web.Response(
status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'}
)
requested_path = request.match_info.get("filename", "")
abs_path = request["home"] / requested_path
if not abs_path.exists():
return aiohttp.web.Response(status=404, text="Resource not found")
# Parse request body
body = await request.read()
if not body:
# If no body, delete all properties
await self.delete_properties_file(abs_path)
return aiohttp.web.Response(status=204)
try:
root = etree.fromstring(body)
prop_elem = root.find(".//{DAV:}prop")
if prop_elem is None:
return aiohttp.web.Response(status=400, text="Bad Request")
# Load existing properties
properties = await self.load_properties(abs_path)
# Delete specified properties
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
if prop_name in properties:
del properties[prop_name]
# Save updated properties or delete file if empty
if properties:
await self.save_properties(abs_path, properties)
else:
await self.delete_properties_file(abs_path)
# Build response
nsmap = {"D": "DAV:"}
response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap)
response = etree.SubElement(response_xml, "{DAV:}response")
href = etree.SubElement(response, "{DAV:}href")
relative_path = str(abs_path.relative_to(request["home"]))
href_path = f"{self.relative_url}/{relative_path}".strip(".")
href_path = href_path.replace("./", "/").replace("//", "/")
href.text = href_path
propstat = etree.SubElement(response, "{DAV:}propstat")
prop = etree.SubElement(propstat, "{DAV:}prop")
for child in prop_elem:
ns = child.nsmap.get(None, "DAV:")
prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}"
elem = etree.SubElement(prop, prop_name)
etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK"
xml_output = etree.tostring(
response_xml, encoding="utf-8", xml_declaration=True
).decode()
return aiohttp.web.Response(
status=207, text=xml_output, content_type="application/xml"
)
except Exception as e:
return aiohttp.web.Response(status=400, text="Bad Request")