# retoor import logging import pathlib import base64 import datetime import json import mimetypes import os import shutil import uuid import aiofiles import aiohttp import aiohttp.web import hashlib from app.cache import time_cache_async from lxml import etree from urllib.parse import urlparse logging.basicConfig(level=logging.DEBUG) @aiohttp.web.middleware async def debug_middleware(request, handler): print(request.method, request.path, request.headers) result = await handler(request) print(result.status) try: print(await result.text()) except: pass return result class WebdavApplication(aiohttp.web.Application): def __init__(self, parent, *args, **kwargs): middlewares = [debug_middleware] super().__init__(middlewares=middlewares, *args, **kwargs) self.locks = {} # rel_path -> lock_dict self.relative_url = "/webdav" self.router.add_route("OPTIONS", "/{filename:.*}", self.handle_options) self.router.add_route("GET", "/{filename:.*}", self.handle_get) self.router.add_route("HEAD", "/{filename:.*}", self.handle_head) self.router.add_route("PUT", "/{filename:.*}", self.handle_put) self.router.add_route("DELETE", "/{filename:.*}", self.handle_delete) self.router.add_route("MKCOL", "/{filename:.*}", self.handle_mkcol) self.router.add_route("MOVE", "/{filename:.*}", self.handle_move) self.router.add_route("COPY", "/{filename:.*}", self.handle_copy) self.router.add_route("PROPFIND", "/{filename:.*}", self.handle_propfind) self.router.add_route("PROPPATCH", "/{filename:.*}", self.handle_proppatch) self.router.add_route("LOCK", "/{filename:.*}", self.handle_lock) self.router.add_route("UNLOCK", "/{filename:.*}", self.handle_unlock) self.router.add_route("PROPGET", "/{filename:.*}", self.handle_propget) self.router.add_route("PROPSET", "/{filename:.*}", self.handle_propset) self.router.add_route("PROPDEL", "/{filename:.*}", self.handle_propdel) self.parent = parent @property def db(self): return self.parent.db @property def services(self): return self.parent.services async def authenticate(self, request): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Basic "): return False encoded_creds = auth_header.split("Basic ")[1] decoded_creds = base64.b64decode(encoded_creds).decode() username, password = decoded_creds.split(":", 1) request["user"] = await self.services.user.authenticate( username=username, password=password ) try: request["home"] = await self.services.user.get_home_folder( request["user"]["uid"] ) except Exception: pass return request["user"] async def is_locked(self, abs_path, request): rel_path = abs_path.relative_to(request["home"]).as_posix() path = abs_path while path != request["home"].parent: rel = path.relative_to(request["home"]).as_posix() if rel in self.locks: lock = self.locks[rel] if self.is_lock_expired(lock): del self.locks[rel] continue if path == abs_path or lock["depth"] == "infinity": return lock path = path.parent return None async def has_descendant_locks(self, rel_path, is_dir): if not is_dir: return False for lock_path in list(self.locks.keys()): if lock_path.startswith(rel_path + "/"): if not self.is_lock_expired(self.locks[lock_path]): return True else: del self.locks[lock_path] return False def is_lock_expired(self, lock): if lock["timeout"] is None: return False delta = datetime.datetime.utcnow() - lock["created"] return delta.total_seconds() > lock["timeout"] async def handle_get(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="File not found") if abs_path.is_dir(): return aiohttp.web.Response(status=403, text="Cannot download a directory") lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") content_type, _ = mimetypes.guess_type(str(abs_path)) content_type = content_type or "application/octet-stream" etag = f'"{hashlib.sha1(str(abs_path.stat().st_mtime).encode()).hexdigest()}"' headers = {"Content-Type": content_type, "ETag": etag} return aiohttp.web.FileResponse(path=str(abs_path), headers=headers, chunk_size=8192) async def handle_head(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="File not found") if abs_path.is_dir(): return aiohttp.web.Response(status=403, text="Cannot get metadata for a directory") lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") content_type, _ = mimetypes.guess_type(str(abs_path)) content_type = content_type or "application/octet-stream" file_size = abs_path.stat().st_size etag = f'"{hashlib.sha1(str(abs_path.stat().st_mtime).encode()).hexdigest()}"' headers = { "Content-Type": content_type, "Content-Length": str(file_size), "Last-Modified": self.get_last_modified(abs_path), "ETag": etag, } return aiohttp.web.Response(status=200, headers=headers) async def handle_put(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") abs_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(abs_path, "wb") as f: while chunk := await request.content.read(1024): await f.write(chunk) return aiohttp.web.Response(status=201, text="File uploaded") async def handle_delete(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") if abs_path.is_file(): abs_path.unlink() props_file = self.get_props_file_path(abs_path) if props_file.exists(): props_file.unlink() rel_path = abs_path.relative_to(request["home"]).as_posix() if rel_path in self.locks: del self.locks[rel_path] return aiohttp.web.Response(status=204) elif abs_path.is_dir(): if await self.has_descendant_locks(abs_path.relative_to(request["home"]).as_posix(), True): return aiohttp.web.Response(status=423, text="Locked") shutil.rmtree(abs_path) rel_path = abs_path.relative_to(request["home"]).as_posix() if rel_path in self.locks: del self.locks[rel_path] return aiohttp.web.Response(status=204) return aiohttp.web.Response(status=404, text="Not found") async def handle_mkcol(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") if abs_path.exists(): return aiohttp.web.Response(status=405, text="Directory already exists") abs_path.mkdir(parents=True, exist_ok=True) return aiohttp.web.Response(status=201, text="Directory created") async def handle_move(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) src_path = request["home"] / request.match_info.get("filename", "") destination = request.headers.get("Destination", "") if not destination: return aiohttp.web.Response(status=400, text="Destination header missing") parsed = urlparse(destination) if parsed.scheme and (parsed.scheme != request.scheme or parsed.netloc != request.host): return aiohttp.web.Response(status=502, text="Bad Gateway") dest_rel = parsed.path[len(self.relative_url)+1:] if parsed.path.startswith(self.relative_url) else "" dest_path = request["home"] / dest_rel if not src_path.exists(): return aiohttp.web.Response(status=404, text="Source not found") src_lock = await self.is_locked(src_path, request) dest_lock = await self.is_locked(dest_path, request) submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if src_lock and src_lock["token"] != submitted: return aiohttp.web.Response(status=423, text="Source Locked") if dest_lock and dest_lock["token"] != submitted: return aiohttp.web.Response(status=423, text="Destination Locked") overwrite = request.headers.get("Overwrite", "T") == "T" if dest_path.exists() and not overwrite: return aiohttp.web.Response(status=412, text="Precondition Failed") if dest_path.exists(): if dest_path.is_dir(): shutil.rmtree(dest_path) else: dest_path.unlink() shutil.move(str(src_path), str(dest_path)) src_props = self.get_props_file_path(src_path) if src_props.exists(): dest_props = self.get_props_file_path(dest_path) shutil.move(str(src_props), str(dest_props)) src_rel = src_path.relative_to(request["home"]).as_posix() dest_rel = dest_path.relative_to(request["home"]).as_posix() if src_rel in self.locks: self.locks[dest_rel] = self.locks.pop(src_rel) return aiohttp.web.Response(status=201, text="Moved successfully") async def handle_copy(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) src_path = request["home"] / request.match_info.get("filename", "") destination = request.headers.get("Destination", "") if not destination: return aiohttp.web.Response(status=400, text="Destination header missing") parsed = urlparse(destination) if parsed.scheme and (parsed.scheme != request.scheme or parsed.netloc != request.host): return aiohttp.web.Response(status=502, text="Bad Gateway") dest_rel = parsed.path[len(self.relative_url)+1:] if parsed.path.startswith(self.relative_url) else "" dest_path = request["home"] / dest_rel if not src_path.exists(): return aiohttp.web.Response(status=404, text="Source not found") src_lock = await self.is_locked(src_path, request) dest_lock = await self.is_locked(dest_path, request) submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if src_lock and src_lock["token"] != submitted: return aiohttp.web.Response(status=423, text="Source Locked") if dest_lock and dest_lock["token"] != submitted: return aiohttp.web.Response(status=423, text="Destination Locked") overwrite = request.headers.get("Overwrite", "T") == "T" if dest_path.exists() and not overwrite: return aiohttp.web.Response(status=412, text="Precondition Failed") if dest_path.exists(): if dest_path.is_dir(): shutil.rmtree(dest_path) else: dest_path.unlink() if src_path.is_file(): shutil.copy2(str(src_path), str(dest_path)) else: shutil.copytree(str(src_path), str(dest_path)) src_props = self.get_props_file_path(src_path) if src_props.exists(): dest_props = self.get_props_file_path(dest_path) shutil.copy2(str(src_props), str(dest_props)) return aiohttp.web.Response(status=201, text="Copied successfully") async def handle_options(self, request): headers = { "DAV": "1, 2", "Allow": "OPTIONS, GET, HEAD, PUT, DELETE, MKCOL, MOVE, COPY, PROPFIND, PROPPATCH, LOCK, UNLOCK, PROPGET, PROPSET, PROPDEL", "MS-Author-Via": "DAV", } return aiohttp.web.Response(status=200, headers=headers) def get_current_utc_time(self, filepath): if filepath.exists(): modified_time = datetime.datetime.utcfromtimestamp(filepath.stat().st_mtime) else: modified_time = datetime.datetime.utcnow() return modified_time.strftime("%Y-%m-%dT%H:%M:%SZ"), modified_time.strftime( "%a, %d %b %Y %H:%M:%S GMT" ) @time_cache_async(10) async def get_file_size(self, path): loop = self.parent.loop stat = await loop.run_in_executor(None, os.stat, path) return stat.st_size @time_cache_async(10) async def get_directory_size(self, directory): total_size = 0 for dirpath, _, filenames in os.walk(directory): for f in filenames: fp = pathlib.Path(dirpath) / f if fp.exists(): total_size += await self.get_file_size(str(fp)) return total_size @time_cache_async(30) async def get_disk_free_space(self, path): loop = self.parent.loop statvfs = await loop.run_in_executor(None, os.statvfs, path) return statvfs.f_bavail * statvfs.f_frsize async def create_propfind_node(self, request, response_xml, full_path, depth, requested_props, propname_only, all_props): abs_path = pathlib.Path(full_path) relative_path = abs_path.relative_to(request["home"]).as_posix() href_path = f"{self.relative_url}/{relative_path}".strip(".") href_path = href_path.replace("./", "/").replace("//", "/") response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") href.text = href_path custom_properties = await self.load_properties(abs_path) propstat_ok = etree.SubElement(response, "{DAV:}propstat") prop_ok = etree.SubElement(propstat_ok, "{DAV:}prop") found_props = [] standard_props = { "{DAV:}resourcetype": None, "{DAV:}creationdate": None, "{DAV:}getlastmodified": None, "{DAV:}displayname": full_path.name, "{DAV:}lockdiscovery": None, "{DAV:}supportedlock": None, "{DAV:}quota-used-bytes": None, "{DAV:}quota-available-bytes": None, "{DAV:}getetag": None, } if full_path.is_file(): mimetype, _ = mimetypes.guess_type(full_path.name) standard_props["{DAV:}getcontentlength"] = None standard_props["{DAV:}getcontenttype"] = mimetype if not all_props and requested_props: propstat_notfound = None prop_notfound = None for prop_tag, prop_nsmap in requested_props: if prop_tag in standard_props: await self.add_standard_property(prop_ok, prop_tag, full_path, request) found_props.append(prop_tag) elif prop_tag in custom_properties: if propname_only: etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap) else: elem = etree.SubElement(prop_ok, prop_tag, nsmap=prop_nsmap) elem.text = str(custom_properties[prop_tag]) found_props.append(prop_tag) else: if propstat_notfound is None: propstat_notfound = etree.SubElement(response, "{DAV:}propstat") prop_notfound = etree.SubElement(propstat_notfound, "{DAV:}prop") etree.SubElement(prop_notfound, prop_tag, nsmap=prop_nsmap) if propstat_notfound is not None: etree.SubElement(propstat_notfound, "{DAV:}status").text = "HTTP/1.1 404 Not Found" else: for prop_name in standard_props: if propname_only: etree.SubElement(prop_ok, prop_name) else: await self.add_standard_property(prop_ok, prop_name, full_path, request) for prop_name, prop_value in custom_properties.items(): if propname_only: if prop_name.startswith("{"): ns_end = prop_name.find("}") ns = prop_name[1:ns_end] local_name = prop_name[ns_end+1:] prop_nsmap = {"D": "DAV:", None: ns} if ns != "DAV:" else {"D": "DAV:"} etree.SubElement(prop_ok, local_name, nsmap=prop_nsmap) else: etree.SubElement(prop_ok, prop_name) else: elem = etree.SubElement(prop_ok, prop_name) elem.text = str(prop_value) etree.SubElement(propstat_ok, "{DAV:}status").text = "HTTP/1.1 200 OK" if abs_path.is_dir() and depth > 0: for item in abs_path.iterdir(): if item.name.startswith(".") and item.name.endswith(".webdav_props.json"): continue await self.create_propfind_node(request, response_xml, item, depth - 1, requested_props, propname_only, all_props) async def add_standard_property(self, prop_elem, prop_name, full_path, request): if prop_name == "{DAV:}resourcetype": res_type = etree.SubElement(prop_elem, "{DAV:}resourcetype") if full_path.is_dir(): etree.SubElement(res_type, "{DAV:}collection") elif prop_name == "{DAV:}creationdate": creation_date, _ = self.get_current_utc_time(full_path) etree.SubElement(prop_elem, "{DAV:}creationdate").text = creation_date elif prop_name == "{DAV:}getlastmodified": _, last_modified = self.get_current_utc_time(full_path) etree.SubElement(prop_elem, "{DAV:}getlastmodified").text = last_modified elif prop_name == "{DAV:}displayname": etree.SubElement(prop_elem, "{DAV:}displayname").text = full_path.name elif prop_name == "{DAV:}lockdiscovery": lockdiscovery = etree.SubElement(prop_elem, "{DAV:}lockdiscovery") rel_path = full_path.relative_to(request["home"]).as_posix() if rel_path in self.locks: lock = self.locks[rel_path] if not self.is_lock_expired(lock): activelock = etree.SubElement(lockdiscovery, "{DAV:}activelock") locktype = etree.SubElement(activelock, "{DAV:}locktype") etree.SubElement(locktype, f"{{DAV:}}{lock['type']}") lockscope = etree.SubElement(activelock, "{DAV:}lockscope") etree.SubElement(lockscope, f"{{DAV:}}{lock['scope']}") etree.SubElement(activelock, "{DAV:}depth").text = lock['depth'].capitalize() if lock['owner']: owner = etree.fromstring(lock['owner']) activelock.append(owner) timeout_str = "Infinite" if lock['timeout'] is None else f"Second-{lock['timeout']}" etree.SubElement(activelock, "{DAV:}timeout").text = timeout_str locktoken = etree.SubElement(activelock, "{DAV:}locktoken") etree.SubElement(locktoken, "{DAV:}href").text = f"opaquelocktoken:{lock['token']}" elif prop_name == "{DAV:}supportedlock": supported_lock = etree.SubElement(prop_elem, "{DAV:}supportedlock") lock_entry_1 = etree.SubElement(supported_lock, "{DAV:}lockentry") lock_scope_1 = etree.SubElement(lock_entry_1, "{DAV:}lockscope") etree.SubElement(lock_scope_1, "{DAV:}exclusive") lock_type_1 = etree.SubElement(lock_entry_1, "{DAV:}locktype") etree.SubElement(lock_type_1, "{DAV:}write") lock_entry_2 = etree.SubElement(supported_lock, "{DAV:}lockentry") lock_scope_2 = etree.SubElement(lock_entry_2, "{DAV:}lockscope") etree.SubElement(lock_scope_2, "{DAV:}shared") lock_type_2 = etree.SubElement(lock_entry_2, "{DAV:}locktype") etree.SubElement(lock_type_2, "{DAV:}write") elif prop_name == "{DAV:}quota-used-bytes": size = await self.get_file_size(full_path) if full_path.is_file() else await self.get_directory_size(full_path) etree.SubElement(prop_elem, "{DAV:}quota-used-bytes").text = str(size) elif prop_name == "{DAV:}quota-available-bytes": free_space = await self.get_disk_free_space(str(full_path)) etree.SubElement(prop_elem, "{DAV:}quota-available-bytes").text = str(free_space) elif prop_name == "{DAV:}getcontentlength" and full_path.is_file(): size = await self.get_file_size(full_path) etree.SubElement(prop_elem, "{DAV:}getcontentlength").text = str(size) elif prop_name == "{DAV:}getcontenttype" and full_path.is_file(): mimetype, _ = mimetypes.guess_type(full_path.name) if mimetype: etree.SubElement(prop_elem, "{DAV:}getcontenttype").text = mimetype elif prop_name == "{DAV:}getetag" and full_path.is_file(): etag = f'"{hashlib.sha1(str(full_path.stat().st_mtime).encode()).hexdigest()}"' etree.SubElement(prop_elem, "{DAV:}getetag").text = etag async def handle_propfind(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) depth = request.headers.get("Depth", "0") if depth == "infinity": depth = float("inf") else: try: depth = int(depth) except ValueError: depth = 0 requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") body = await request.read() requested_props = [] propname_only = False all_props = True if body: try: root = etree.fromstring(body) if root.find(".//{DAV:}propname") is not None: propname_only = True all_props = False prop_elem = root.find(".//{DAV:}prop") if prop_elem is not None and not propname_only: all_props = False for child in prop_elem: requested_props.append((child.tag, child.nsmap)) if root.find(".//{DAV:}allprop") is not None: all_props = True except: pass nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) await self.create_propfind_node(request, response_xml, abs_path, depth, requested_props, propname_only, all_props) xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode() return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml") async def handle_proppatch(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") body = await request.read() if not body: return aiohttp.web.Response(status=400, text="Bad Request") try: root = etree.fromstring(body) properties = await self.load_properties(abs_path) nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") relative_path = abs_path.relative_to(request["home"]).as_posix() href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/") href.text = href_path set_elem = root.find(".//{DAV:}set") if set_elem is not None: prop_elem = set_elem.find("{DAV:}prop") if prop_elem is not None: propstat = etree.SubElement(response, "{DAV:}propstat") prop = etree.SubElement(propstat, "{DAV:}prop") for child in prop_elem: if child.tag.startswith("{"): ns_end = child.tag.find("}") ns = child.tag[1:ns_end] local_name = child.tag[ns_end+1:] else: ns = "DAV:" local_name = child.tag prop_name = f"{{{ns}}}{local_name}" prop_value = child.text or "" properties[prop_name] = prop_value elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap) etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" remove_elem = root.find(".//{DAV:}remove") if remove_elem is not None: prop_elem = remove_elem.find("{DAV:}prop") if prop_elem is not None: propstat = etree.SubElement(response, "{DAV:}propstat") prop = etree.SubElement(propstat, "{DAV:}prop") for child in prop_elem: if child.tag.startswith("{"): ns_end = child.tag.find("}") ns = child.tag[1:ns_end] local_name = child.tag[ns_end+1:] else: ns = "DAV:" local_name = child.tag prop_name = f"{{{ns}}}{local_name}" if prop_name in properties: del properties[prop_name] status = "HTTP/1.1 200 OK" else: status = "HTTP/1.1 404 Not Found" elem = etree.SubElement(prop, child.tag, nsmap=child.nsmap) etree.SubElement(propstat, "{DAV:}status").text = status if properties: await self.save_properties(abs_path, properties) else: await self.delete_properties_file(abs_path) xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode() return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml") except Exception as e: logging.error(f"PROPPATCH error: {e}") return aiohttp.web.Response(status=400, text="Bad Request") async def handle_lock(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path resource = abs_path.relative_to(request["home"]).as_posix() body = await request.read() timeout_str = request.headers.get("Timeout", "Second-3600") if timeout_str == "Infinite": timeout = None elif timeout_str.startswith("Second-"): try: timeout = int(timeout_str[7:]) except: timeout = 3600 else: timeout = 3600 depth = request.headers.get("Depth", "infinity").lower() if depth not in ["0", "infinity"]: return aiohttp.web.Response(status=400, text="Invalid Depth") if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") if body: try: root = etree.fromstring(body) lockinfo = root.find(".//{DAV:}lockinfo") if lockinfo is None: return aiohttp.web.Response(status=400, text="Invalid lockinfo") scope = "exclusive" scope_elem = lockinfo.find("{DAV:}lockscope") if scope_elem is not None: if scope_elem.find("{DAV:}shared") is not None: scope = "shared" locktype = "write" owner_elem = lockinfo.find("{DAV:}owner") owner_xml = etree.tostring(owner_elem, encoding="unicode") if owner_elem is not None else "" covering_lock = await self.is_locked(abs_path, request) if covering_lock: return aiohttp.web.Response(status=423, text="Locked") if abs_path.is_dir() and depth == "infinity" and await self.has_descendant_locks(resource, True): return aiohttp.web.Response(status=423, text="Locked") token = str(uuid.uuid4()) lock_dict = { "token": token, "scope": scope, "type": locktype, "owner": owner_xml, "depth": depth, "timeout": timeout, "created": datetime.datetime.utcnow() } self.locks[resource] = lock_dict xml = await self.generate_lock_response(lock_dict) headers = {"Lock-Token": f""} return aiohttp.web.Response(status=200, text=xml, content_type="application/xml", headers=headers) except Exception as e: logging.error(f"LOCK error: {e}") return aiohttp.web.Response(status=400, text="Bad Request") else: if resource not in self.locks: return aiohttp.web.Response(status=412, text="Precondition Failed") lock = self.locks[resource] if self.is_lock_expired(lock): del self.locks[resource] return aiohttp.web.Response(status=412, text="Precondition Failed") submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=409, text="Conflict") lock["created"] = datetime.datetime.utcnow() lock["timeout"] = timeout xml = await self.generate_lock_response(lock) headers = {"Lock-Token": f""} return aiohttp.web.Response(status=200, text=xml, content_type="application/xml", headers=headers) async def handle_unlock(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) resource = request.match_info.get("filename", "") if resource not in self.locks: return aiohttp.web.Response(status=409, text="Conflict") lock = self.locks[resource] if self.is_lock_expired(lock): del self.locks[resource] return aiohttp.web.Response(status=409, text="Conflict") submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=400, text="Invalid Lock Token") del self.locks[resource] return aiohttp.web.Response(status=204) async def generate_lock_response(self, lock_dict): nsmap = {"D": "DAV:"} prop = etree.Element("{DAV:}prop", nsmap=nsmap) lockdiscovery = etree.SubElement(prop, "{DAV:}lockdiscovery") activelock = etree.SubElement(lockdiscovery, "{DAV:}activelock") locktype = etree.SubElement(activelock, "{DAV:}locktype") etree.SubElement(locktype, f"{{DAV:}}{lock_dict['type']}") lockscope = etree.SubElement(activelock, "{DAV:}lockscope") etree.SubElement(lockscope, f"{{DAV:}}{lock_dict['scope']}") etree.SubElement(activelock, "{DAV:}depth").text = lock_dict['depth'].capitalize() if lock_dict['owner']: owner = etree.fromstring(lock_dict['owner']) activelock.append(owner) timeout_str = "Infinite" if lock_dict['timeout'] is None else f"Second-{lock_dict['timeout']}" etree.SubElement(activelock, "{DAV:}timeout").text = timeout_str locktoken = etree.SubElement(activelock, "{DAV:}locktoken") etree.SubElement(locktoken, "{DAV:}href").text = f"opaquelocktoken:{lock_dict['token']}" return etree.tostring(prop, encoding="utf-8", xml_declaration=True).decode() def get_last_modified(self, path): if not path.exists(): return None timestamp = path.stat().st_mtime dt = datetime.datetime.utcfromtimestamp(timestamp) return dt.strftime("%a, %d %b %Y %H:%M:%S GMT") def get_props_file_path(self, resource_path): if resource_path.is_dir(): return resource_path / ".webdav_props.json" else: return resource_path.parent / f".{resource_path.name}.webdav_props.json" async def load_properties(self, resource_path): props_file = self.get_props_file_path(resource_path) if props_file.exists(): async with aiofiles.open(props_file, "r") as f: content = await f.read() return json.loads(content) return {} async def save_properties(self, resource_path, properties): props_file = self.get_props_file_path(resource_path) async with aiofiles.open(props_file, "w") as f: await f.write(json.dumps(properties, indent=2)) async def delete_properties_file(self, resource_path): props_file = self.get_props_file_path(resource_path) if props_file.exists(): props_file.unlink() async def handle_propget(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") properties = await self.load_properties(abs_path) body = await request.read() requested_props = [] if body: try: root = etree.fromstring(body) prop_elem = root.find(".//{DAV:}prop") if prop_elem is not None: for child in prop_elem: ns = child.nsmap.get(None, "DAV:") requested_props.append(f"{{{ns}}}{child.tag.split('}')[-1]}") except: pass nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") relative_path = abs_path.relative_to(request["home"]).as_posix() href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/") href.text = href_path propstat = etree.SubElement(response, "{DAV:}propstat") prop = etree.SubElement(propstat, "{DAV:}prop") if requested_props: for prop_name in requested_props: if prop_name in properties: elem = etree.SubElement(prop, prop_name) elem.text = str(properties[prop_name]) else: for prop_name, prop_value in properties.items(): elem = etree.SubElement(prop, prop_name) elem.text = str(prop_value) etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode() return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml") async def handle_propset(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") body = await request.read() if not body: return aiohttp.web.Response(status=400, text="Bad Request") try: root = etree.fromstring(body) prop_elem = root.find(".//{DAV:}prop") if prop_elem is None: return aiohttp.web.Response(status=400, text="Bad Request") properties = await self.load_properties(abs_path) for child in prop_elem: ns = child.nsmap.get(None, "DAV:") prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" prop_value = child.text or "" properties[prop_name] = prop_value await self.save_properties(abs_path, properties) nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") relative_path = abs_path.relative_to(request["home"]).as_posix() href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/") href.text = href_path propstat = etree.SubElement(response, "{DAV:}propstat") prop = etree.SubElement(propstat, "{DAV:}prop") for child in prop_elem: ns = child.nsmap.get(None, "DAV:") prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" elem = etree.SubElement(prop, prop_name) etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode() return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml") except Exception as e: logging.error(f"PROPSET error: {e}") return aiohttp.web.Response(status=400, text="Bad Request") async def handle_propdel(self, request): if not await self.authenticate(request): return aiohttp.web.Response( status=401, headers={"WWW-Authenticate": 'Basic realm="WebDAV"'} ) requested_path = request.match_info.get("filename", "") abs_path = request["home"] / requested_path if not abs_path.exists(): return aiohttp.web.Response(status=404, text="Resource not found") lock = await self.is_locked(abs_path, request) if lock: submitted = request.headers.get("Lock-Token", "").strip(" <>").replace("opaquelocktoken:", "") if submitted != lock["token"]: return aiohttp.web.Response(status=423, text="Locked") body = await request.read() if not body: await self.delete_properties_file(abs_path) return aiohttp.web.Response(status=204) try: root = etree.fromstring(body) prop_elem = root.find(".//{DAV:}prop") if prop_elem is None: return aiohttp.web.Response(status=400, text="Bad Request") properties = await self.load_properties(abs_path) for child in prop_elem: ns = child.nsmap.get(None, "DAV:") prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" if prop_name in properties: del properties[prop_name] if properties: await self.save_properties(abs_path, properties) else: await self.delete_properties_file(abs_path) nsmap = {"D": "DAV:"} response_xml = etree.Element("{DAV:}multistatus", nsmap=nsmap) response = etree.SubElement(response_xml, "{DAV:}response") href = etree.SubElement(response, "{DAV:}href") relative_path = abs_path.relative_to(request["home"]).as_posix() href_path = f"{self.relative_url}/{relative_path}".strip(".").replace("./", "/").replace("//", "/") href.text = href_path propstat = etree.SubElement(response, "{DAV:}propstat") prop = etree.SubElement(propstat, "{DAV:}prop") for child in prop_elem: ns = child.nsmap.get(None, "DAV:") prop_name = f"{{{ns}}}{child.tag.split('}')[-1]}" elem = etree.SubElement(prop, prop_name) etree.SubElement(propstat, "{DAV:}status").text = "HTTP/1.1 200 OK" xml_output = etree.tostring(response_xml, encoding="utf-8", xml_declaration=True).decode() return aiohttp.web.Response(status=207, text=xml_output, content_type="application/xml") except Exception as e: logging.error(f"PROPDEL error: {e}") return aiohttp.web.Response(status=400, text="Bad Request")