841 lines
31 KiB
Python
Raw Normal View History

2025-11-09 23:29:07 +01:00
from fastapi import APIRouter, Request, Response, Depends, HTTPException, status, Header
2025-11-10 00:28:48 +01:00
from fastapi.responses import StreamingResponse
2025-11-09 23:29:07 +01:00
from typing import Optional
from xml.etree import ElementTree as ET
from datetime import datetime
import hashlib
import mimetypes
import os
import base64
from urllib.parse import unquote, urlparse
from .auth import get_current_user, verify_password
2025-11-10 00:28:48 +01:00
from .models import User, File, Folder, WebDAVProperty
2025-11-09 23:29:07 +01:00
from .storage import storage_manager
from .activity import log_activity
router = APIRouter(
prefix="/webdav",
tags=["webdav"],
)
class WebDAVLock:
locks = {}
@classmethod
def create_lock(cls, path: str, user_id: int, timeout: int = 3600):
lock_token = f"opaquelocktoken:{hashlib.md5(f'{path}{user_id}{datetime.now()}'.encode()).hexdigest()}"
cls.locks[path] = {
'token': lock_token,
'user_id': user_id,
'created_at': datetime.now(),
'timeout': timeout
}
return lock_token
@classmethod
def get_lock(cls, path: str):
return cls.locks.get(path)
@classmethod
def remove_lock(cls, path: str):
if path in cls.locks:
del cls.locks[path]
async def basic_auth(authorization: Optional[str] = Header(None)):
if not authorization:
return None
try:
scheme, credentials = authorization.split()
if scheme.lower() != 'basic':
return None
decoded = base64.b64decode(credentials).decode('utf-8')
username, password = decoded.split(':', 1)
user = await User.get_or_none(username=username)
if user and verify_password(password, user.hashed_password):
return user
2025-11-10 15:46:40 +01:00
except (ValueError, UnicodeDecodeError, base64.binascii.Error):
return None
2025-11-09 23:29:07 +01:00
return None
async def webdav_auth(request: Request, authorization: Optional[str] = Header(None)):
user = await basic_auth(authorization)
if user:
return user
try:
user = await get_current_user(request)
return user
2025-11-10 15:46:40 +01:00
except HTTPException:
2025-11-09 23:29:07 +01:00
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={'WWW-Authenticate': 'Basic realm="RBox WebDAV"'}
)
async def resolve_path(path_str: str, user: User):
if not path_str or path_str == '/':
return None, None, True
parts = [p for p in path_str.split('/') if p]
if not parts:
return None, None, True
current_folder = None
for i, part in enumerate(parts[:-1]):
folder = await Folder.get_or_none(
name=part,
parent=current_folder,
owner=user,
is_deleted=False
)
if not folder:
return None, None, False
current_folder = folder
last_part = parts[-1]
folder = await Folder.get_or_none(
name=last_part,
parent=current_folder,
owner=user,
is_deleted=False
)
if folder:
return folder, current_folder, True
file = await File.get_or_none(
name=last_part,
parent=current_folder,
owner=user,
is_deleted=False
)
if file:
return file, current_folder, True
return None, current_folder, True
def build_href(base_path: str, name: str, is_collection: bool):
path = f"{base_path.rstrip('/')}/{name}"
if is_collection:
path += '/'
return path
2025-11-10 00:28:48 +01:00
async def get_custom_properties(resource_type: str, resource_id: int):
props = await WebDAVProperty.filter(
resource_type=resource_type,
resource_id=resource_id
)
return {(prop.namespace, prop.name): prop.value for prop in props}
def create_propstat_element(props: dict, custom_props: dict = None, status: str = "HTTP/1.1 200 OK"):
2025-11-09 23:29:07 +01:00
propstat = ET.Element("D:propstat")
prop = ET.SubElement(propstat, "D:prop")
for key, value in props.items():
if key == "resourcetype":
resourcetype = ET.SubElement(prop, "D:resourcetype")
if value == "collection":
ET.SubElement(resourcetype, "D:collection")
elif key == "getcontentlength":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = str(value)
elif key == "getcontenttype":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = value
elif key == "getlastmodified":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = value.strftime('%a, %d %b %Y %H:%M:%S GMT')
elif key == "creationdate":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = value.isoformat() + 'Z'
elif key == "displayname":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = value
elif key == "getetag":
elem = ET.SubElement(prop, f"D:{key}")
elem.text = value
2025-11-10 00:28:48 +01:00
if custom_props:
for (namespace, name), value in custom_props.items():
if namespace == "DAV:":
continue
elem = ET.SubElement(prop, f"{{{namespace}}}{name}")
elem.text = value
2025-11-09 23:29:07 +01:00
status_elem = ET.SubElement(propstat, "D:status")
status_elem.text = status
return propstat
2025-11-10 00:28:48 +01:00
def parse_propfind_body(body: bytes):
if not body:
return None
try:
root = ET.fromstring(body)
allprop = root.find(".//{DAV:}allprop")
if allprop is not None:
return "allprop"
propname = root.find(".//{DAV:}propname")
if propname is not None:
return "propname"
prop = root.find(".//{DAV:}prop")
if prop is not None:
requested_props = []
for child in prop:
ns = child.tag.split('}')[0][1:] if '}' in child.tag else "DAV:"
name = child.tag.split('}')[1] if '}' in child.tag else child.tag
requested_props.append((ns, name))
return requested_props
2025-11-10 15:46:40 +01:00
except ET.ParseError:
return None
2025-11-10 00:28:48 +01:00
return None
2025-11-09 23:29:07 +01:00
@router.api_route("/{full_path:path}", methods=["OPTIONS"])
async def webdav_options(full_path: str):
return Response(
status_code=200,
headers={
"DAV": "1, 2",
"Allow": "OPTIONS, GET, HEAD, POST, PUT, DELETE, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE, LOCK, UNLOCK",
"MS-Author-Via": "DAV"
}
)
@router.api_route("/{full_path:path}", methods=["PROPFIND"])
async def handle_propfind(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
depth = request.headers.get("Depth", "1")
full_path = unquote(full_path).strip('/')
2025-11-10 00:28:48 +01:00
body = await request.body()
requested_props = parse_propfind_body(body)
2025-11-09 23:29:07 +01:00
resource, parent, exists = await resolve_path(full_path, current_user)
if not exists and resource is None:
raise HTTPException(status_code=404, detail="Not found")
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
base_href = f"/webdav/{full_path}" if full_path else "/webdav/"
if resource is None:
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = base_href if base_href.endswith('/') else base_href + '/'
props = {
"resourcetype": "collection",
"displayname": full_path.split('/')[-1] if full_path else "Root",
"creationdate": datetime.now(),
"getlastmodified": datetime.now()
}
response.append(create_propstat_element(props))
if depth in ["1", "infinity"]:
folders = await Folder.filter(owner=current_user, parent=parent, is_deleted=False)
files = await File.filter(owner=current_user, parent=parent, is_deleted=False)
for folder in folders:
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = build_href(base_href, folder.name, True)
props = {
"resourcetype": "collection",
"displayname": folder.name,
"creationdate": folder.created_at,
"getlastmodified": folder.updated_at
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("folder", folder.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
for file in files:
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = build_href(base_href, file.name, False)
props = {
"resourcetype": "",
"displayname": file.name,
"getcontentlength": file.size,
"getcontenttype": file.mime_type,
"creationdate": file.created_at,
"getlastmodified": file.updated_at,
"getetag": f'"{file.file_hash}"'
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("file", file.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
elif isinstance(resource, Folder):
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = base_href if base_href.endswith('/') else base_href + '/'
props = {
"resourcetype": "collection",
"displayname": resource.name,
"creationdate": resource.created_at,
"getlastmodified": resource.updated_at
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("folder", resource.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
if depth in ["1", "infinity"]:
folders = await Folder.filter(owner=current_user, parent=resource, is_deleted=False)
files = await File.filter(owner=current_user, parent=resource, is_deleted=False)
for folder in folders:
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = build_href(base_href, folder.name, True)
props = {
"resourcetype": "collection",
"displayname": folder.name,
"creationdate": folder.created_at,
"getlastmodified": folder.updated_at
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("folder", folder.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
for file in files:
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = build_href(base_href, file.name, False)
props = {
"resourcetype": "",
"displayname": file.name,
"getcontentlength": file.size,
"getcontenttype": file.mime_type,
"creationdate": file.created_at,
"getlastmodified": file.updated_at,
"getetag": f'"{file.file_hash}"'
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("file", file.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
elif isinstance(resource, File):
response = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response, "D:href")
href.text = base_href
props = {
"resourcetype": "",
"displayname": resource.name,
"getcontentlength": resource.size,
"getcontenttype": resource.mime_type,
"creationdate": resource.created_at,
"getlastmodified": resource.updated_at,
"getetag": f'"{resource.file_hash}"'
}
2025-11-10 00:28:48 +01:00
custom_props = await get_custom_properties("file", resource.id) if requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props)) else None
response.append(create_propstat_element(props, custom_props))
2025-11-09 23:29:07 +01:00
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
return Response(content=xml_content, media_type="application/xml; charset=utf-8", status_code=207)
@router.api_route("/{full_path:path}", methods=["GET", "HEAD"])
async def handle_get(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
resource, parent, exists = await resolve_path(full_path, current_user)
if not isinstance(resource, File):
raise HTTPException(status_code=404, detail="File not found")
try:
if request.method == "HEAD":
return Response(
status_code=200,
headers={
"Content-Length": str(resource.size),
"Content-Type": resource.mime_type,
"ETag": f'"{resource.file_hash}"',
"Last-Modified": resource.updated_at.strftime('%a, %d %b %Y %H:%M:%S GMT')
}
)
async def file_iterator():
async for chunk in storage_manager.get_file(current_user.id, resource.path):
yield chunk
2025-11-10 00:28:48 +01:00
return StreamingResponse(
2025-11-09 23:29:07 +01:00
content=file_iterator(),
media_type=resource.mime_type,
headers={
"Content-Disposition": f'attachment; filename="{resource.name}"',
"ETag": f'"{resource.file_hash}"',
"Last-Modified": resource.updated_at.strftime('%a, %d %b %Y %H:%M:%S GMT')
}
)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="File not found in storage")
@router.api_route("/{full_path:path}", methods=["PUT"])
async def handle_put(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
if not full_path:
raise HTTPException(status_code=400, detail="Cannot PUT to root")
parts = [p for p in full_path.split('/') if p]
file_name = parts[-1]
parent_path = '/'.join(parts[:-1]) if len(parts) > 1 else ''
_, parent_folder, exists = await resolve_path(parent_path, current_user)
if not exists:
raise HTTPException(status_code=409, detail="Parent folder does not exist")
file_content = await request.body()
file_size = len(file_content)
if current_user.used_storage_bytes + file_size > current_user.storage_quota_bytes:
raise HTTPException(status_code=507, detail="Storage quota exceeded")
file_hash = hashlib.sha256(file_content).hexdigest()
file_extension = os.path.splitext(file_name)[1]
unique_filename = f"{file_hash}{file_extension}"
storage_path = os.path.join(str(current_user.id), unique_filename)
await storage_manager.save_file(current_user.id, storage_path, file_content)
mime_type, _ = mimetypes.guess_type(file_name)
if not mime_type:
mime_type = "application/octet-stream"
existing_file = await File.get_or_none(
name=file_name,
parent=parent_folder,
owner=current_user,
is_deleted=False
)
if existing_file:
old_size = existing_file.size
existing_file.path = storage_path
existing_file.size = file_size
existing_file.mime_type = mime_type
existing_file.file_hash = file_hash
existing_file.updated_at = datetime.now()
await existing_file.save()
current_user.used_storage_bytes = current_user.used_storage_bytes - old_size + file_size
await current_user.save()
await log_activity(current_user, "file_updated", "file", existing_file.id)
return Response(status_code=204)
else:
db_file = await File.create(
name=file_name,
path=storage_path,
size=file_size,
mime_type=mime_type,
file_hash=file_hash,
owner=current_user,
parent=parent_folder
)
current_user.used_storage_bytes += file_size
await current_user.save()
await log_activity(current_user, "file_created", "file", db_file.id)
return Response(status_code=201)
@router.api_route("/{full_path:path}", methods=["DELETE"])
async def handle_delete(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
if not full_path:
raise HTTPException(status_code=400, detail="Cannot DELETE root")
resource, parent, exists = await resolve_path(full_path, current_user)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
if isinstance(resource, File):
resource.is_deleted = True
resource.deleted_at = datetime.now()
await resource.save()
await log_activity(current_user, "file_deleted", "file", resource.id)
elif isinstance(resource, Folder):
resource.is_deleted = True
await resource.save()
await log_activity(current_user, "folder_deleted", "folder", resource.id)
return Response(status_code=204)
@router.api_route("/{full_path:path}", methods=["MKCOL"])
async def handle_mkcol(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
if not full_path:
raise HTTPException(status_code=400, detail="Cannot MKCOL at root")
parts = [p for p in full_path.split('/') if p]
folder_name = parts[-1]
parent_path = '/'.join(parts[:-1]) if len(parts) > 1 else ''
_, parent_folder, exists = await resolve_path(parent_path, current_user)
if not exists:
raise HTTPException(status_code=409, detail="Parent folder does not exist")
existing = await Folder.get_or_none(
name=folder_name,
parent=parent_folder,
owner=current_user,
is_deleted=False
)
if existing:
raise HTTPException(status_code=405, detail="Folder already exists")
folder = await Folder.create(
name=folder_name,
parent=parent_folder,
owner=current_user
)
await log_activity(current_user, "folder_created", "folder", folder.id)
return Response(status_code=201)
@router.api_route("/{full_path:path}", methods=["COPY"])
async def handle_copy(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
destination = request.headers.get("Destination")
overwrite = request.headers.get("Overwrite", "T")
if not destination:
raise HTTPException(status_code=400, detail="Destination header required")
dest_path = unquote(urlparse(destination).path)
dest_path = dest_path.replace('/webdav/', '').strip('/')
source_resource, _, exists = await resolve_path(full_path, current_user)
if not source_resource:
raise HTTPException(status_code=404, detail="Source not found")
if not isinstance(source_resource, File):
raise HTTPException(status_code=501, detail="Only file copy is implemented")
dest_parts = [p for p in dest_path.split('/') if p]
dest_name = dest_parts[-1]
dest_parent_path = '/'.join(dest_parts[:-1]) if len(dest_parts) > 1 else ''
_, dest_parent, exists = await resolve_path(dest_parent_path, current_user)
if not exists:
raise HTTPException(status_code=409, detail="Destination parent does not exist")
existing_dest = await File.get_or_none(
name=dest_name,
parent=dest_parent,
owner=current_user,
is_deleted=False
)
if existing_dest and overwrite == "F":
raise HTTPException(status_code=412, detail="Destination exists and overwrite is false")
if existing_dest:
await existing_dest.delete()
new_file = await File.create(
name=dest_name,
path=source_resource.path,
size=source_resource.size,
mime_type=source_resource.mime_type,
file_hash=source_resource.file_hash,
owner=current_user,
parent=dest_parent
)
await log_activity(current_user, "file_copied", "file", new_file.id)
return Response(status_code=201 if not existing_dest else 204)
@router.api_route("/{full_path:path}", methods=["MOVE"])
async def handle_move(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
destination = request.headers.get("Destination")
overwrite = request.headers.get("Overwrite", "T")
if not destination:
raise HTTPException(status_code=400, detail="Destination header required")
dest_path = unquote(urlparse(destination).path)
dest_path = dest_path.replace('/webdav/', '').strip('/')
source_resource, _, exists = await resolve_path(full_path, current_user)
if not source_resource:
raise HTTPException(status_code=404, detail="Source not found")
dest_parts = [p for p in dest_path.split('/') if p]
dest_name = dest_parts[-1]
dest_parent_path = '/'.join(dest_parts[:-1]) if len(dest_parts) > 1 else ''
_, dest_parent, exists = await resolve_path(dest_parent_path, current_user)
if not exists:
raise HTTPException(status_code=409, detail="Destination parent does not exist")
if isinstance(source_resource, File):
existing_dest = await File.get_or_none(
name=dest_name,
parent=dest_parent,
owner=current_user,
is_deleted=False
)
if existing_dest and overwrite == "F":
raise HTTPException(status_code=412, detail="Destination exists and overwrite is false")
if existing_dest:
await existing_dest.delete()
source_resource.name = dest_name
source_resource.parent = dest_parent
await source_resource.save()
await log_activity(current_user, "file_moved", "file", source_resource.id)
return Response(status_code=201 if not existing_dest else 204)
elif isinstance(source_resource, Folder):
existing_dest = await Folder.get_or_none(
name=dest_name,
parent=dest_parent,
owner=current_user,
is_deleted=False
)
if existing_dest and overwrite == "F":
raise HTTPException(status_code=412, detail="Destination exists and overwrite is false")
if existing_dest:
existing_dest.is_deleted = True
await existing_dest.save()
source_resource.name = dest_name
source_resource.parent = dest_parent
await source_resource.save()
await log_activity(current_user, "folder_moved", "folder", source_resource.id)
return Response(status_code=201 if not existing_dest else 204)
@router.api_route("/{full_path:path}", methods=["LOCK"])
async def handle_lock(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
timeout_header = request.headers.get("Timeout", "Second-3600")
timeout = 3600
if timeout_header.startswith("Second-"):
try:
timeout = int(timeout_header.split("-")[1])
2025-11-10 15:46:40 +01:00
except (ValueError, IndexError):
2025-11-09 23:29:07 +01:00
pass
lock_token = WebDAVLock.create_lock(full_path, current_user.id, timeout)
lockinfo = ET.Element("D:prop", {"xmlns:D": "DAV:"})
lockdiscovery = ET.SubElement(lockinfo, "D:lockdiscovery")
activelock = ET.SubElement(lockdiscovery, "D:activelock")
locktype = ET.SubElement(activelock, "D:locktype")
ET.SubElement(locktype, "D:write")
lockscope = ET.SubElement(activelock, "D:lockscope")
ET.SubElement(lockscope, "D:exclusive")
depth_elem = ET.SubElement(activelock, "D:depth")
depth_elem.text = "0"
owner = ET.SubElement(activelock, "D:owner")
owner_href = ET.SubElement(owner, "D:href")
owner_href.text = current_user.username
timeout_elem = ET.SubElement(activelock, "D:timeout")
timeout_elem.text = f"Second-{timeout}"
locktoken_elem = ET.SubElement(activelock, "D:locktoken")
href = ET.SubElement(locktoken_elem, "D:href")
href.text = lock_token
xml_content = ET.tostring(lockinfo, encoding="utf-8", xml_declaration=True)
return Response(
content=xml_content,
media_type="application/xml; charset=utf-8",
status_code=200,
headers={"Lock-Token": f"<{lock_token}>"}
)
@router.api_route("/{full_path:path}", methods=["UNLOCK"])
async def handle_unlock(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
lock_token_header = request.headers.get("Lock-Token")
if not lock_token_header:
raise HTTPException(status_code=400, detail="Lock-Token header required")
lock_token = lock_token_header.strip('<>')
existing_lock = WebDAVLock.get_lock(full_path)
if not existing_lock or existing_lock['token'] != lock_token:
raise HTTPException(status_code=409, detail="Invalid lock token")
if existing_lock['user_id'] != current_user.id:
raise HTTPException(status_code=403, detail="Not lock owner")
WebDAVLock.remove_lock(full_path)
return Response(status_code=204)
2025-11-10 00:28:48 +01:00
@router.api_route("/{full_path:path}", methods=["PROPPATCH"])
async def handle_proppatch(request: Request, full_path: str, current_user: User = Depends(webdav_auth)):
full_path = unquote(full_path).strip('/')
resource, parent, exists = await resolve_path(full_path, current_user)
if not resource:
raise HTTPException(status_code=404, detail="Resource not found")
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="Request body required")
try:
root = ET.fromstring(body)
except:
raise HTTPException(status_code=400, detail="Invalid XML")
resource_type = "file" if isinstance(resource, File) else "folder"
resource_id = resource.id
set_props = []
remove_props = []
failed_props = []
set_element = root.find(".//{DAV:}set")
if set_element is not None:
prop_element = set_element.find(".//{DAV:}prop")
if prop_element is not None:
for child in prop_element:
ns = child.tag.split('}')[0][1:] if '}' in child.tag else "DAV:"
name = child.tag.split('}')[1] if '}' in child.tag else child.tag
value = child.text or ""
if ns == "DAV:":
live_props = ["creationdate", "getcontentlength", "getcontenttype",
"getetag", "getlastmodified", "resourcetype"]
if name in live_props:
failed_props.append((ns, name, "409 Conflict"))
continue
try:
existing_prop = await WebDAVProperty.get_or_none(
resource_type=resource_type,
resource_id=resource_id,
namespace=ns,
name=name
)
if existing_prop:
existing_prop.value = value
await existing_prop.save()
else:
await WebDAVProperty.create(
resource_type=resource_type,
resource_id=resource_id,
namespace=ns,
name=name,
value=value
)
set_props.append((ns, name))
except Exception as e:
failed_props.append((ns, name, "500 Internal Server Error"))
remove_element = root.find(".//{DAV:}remove")
if remove_element is not None:
prop_element = remove_element.find(".//{DAV:}prop")
if prop_element is not None:
for child in prop_element:
ns = child.tag.split('}')[0][1:] if '}' in child.tag else "DAV:"
name = child.tag.split('}')[1] if '}' in child.tag else child.tag
if ns == "DAV:":
failed_props.append((ns, name, "409 Conflict"))
continue
try:
existing_prop = await WebDAVProperty.get_or_none(
resource_type=resource_type,
resource_id=resource_id,
namespace=ns,
name=name
)
if existing_prop:
await existing_prop.delete()
remove_props.append((ns, name))
else:
failed_props.append((ns, name, "404 Not Found"))
except Exception as e:
failed_props.append((ns, name, "500 Internal Server Error"))
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
response_elem = ET.SubElement(multistatus, "D:response")
href = ET.SubElement(response_elem, "D:href")
href.text = f"/webdav/{full_path}"
if set_props or remove_props:
propstat = ET.SubElement(response_elem, "D:propstat")
prop = ET.SubElement(propstat, "D:prop")
for ns, name in set_props + remove_props:
if ns == "DAV:":
ET.SubElement(prop, f"D:{name}")
else:
ET.SubElement(prop, f"{{{ns}}}{name}")
status_elem = ET.SubElement(propstat, "D:status")
status_elem.text = "HTTP/1.1 200 OK"
if failed_props:
prop_by_status = {}
for ns, name, status_text in failed_props:
if status_text not in prop_by_status:
prop_by_status[status_text] = []
prop_by_status[status_text].append((ns, name))
for status_text, props_list in prop_by_status.items():
propstat = ET.SubElement(response_elem, "D:propstat")
prop = ET.SubElement(propstat, "D:prop")
for ns, name in props_list:
if ns == "DAV:":
ET.SubElement(prop, f"D:{name}")
else:
ET.SubElement(prop, f"{{{ns}}}{name}")
status_elem = ET.SubElement(propstat, "D:status")
status_elem.text = f"HTTP/1.1 {status_text}"
await log_activity(current_user, "properties_modified", resource_type, resource_id)
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
return Response(content=xml_content, media_type="application/xml; charset=utf-8", status_code=207)