|
from fastapi import APIRouter, Request, Response, Depends, HTTPException, status, Header
|
|
from fastapi.responses import StreamingResponse
|
|
from typing import Optional
|
|
from xml.etree import ElementTree as ET
|
|
from datetime import datetime
|
|
import hashlib
|
|
import mimetypes
|
|
import os
|
|
import base64
|
|
from urllib.parse import unquote, urlparse
|
|
|
|
from .auth import get_current_user, verify_password
|
|
from .models import User, File, Folder, WebDAVProperty
|
|
from .storage import storage_manager
|
|
from .activity import log_activity
|
|
from .settings import settings
|
|
from jose import JWTError, jwt
|
|
|
|
router = APIRouter(
|
|
prefix="/webdav",
|
|
tags=["webdav"],
|
|
)
|
|
|
|
|
|
class WebDAVLock:
|
|
locks = {}
|
|
|
|
@classmethod
|
|
def create_lock(cls, path: str, user_id: int, timeout: int = 3600):
|
|
lock_token = f"opaquelocktoken:{hashlib.md5(f'{path}{user_id}{datetime.now()}'.encode()).hexdigest()}"
|
|
cls.locks[path] = {
|
|
"token": lock_token,
|
|
"user_id": user_id,
|
|
"created_at": datetime.now(),
|
|
"timeout": timeout,
|
|
}
|
|
return lock_token
|
|
|
|
@classmethod
|
|
def get_lock(cls, path: str):
|
|
return cls.locks.get(path)
|
|
|
|
@classmethod
|
|
def remove_lock(cls, path: str):
|
|
if path in cls.locks:
|
|
del cls.locks[path]
|
|
|
|
|
|
async def basic_auth(authorization: Optional[str] = Header(None)):
|
|
if not authorization:
|
|
return None
|
|
|
|
try:
|
|
scheme, credentials = authorization.split()
|
|
if scheme.lower() != "basic":
|
|
return None
|
|
|
|
decoded = base64.b64decode(credentials).decode("utf-8")
|
|
username, password = decoded.split(":", 1)
|
|
|
|
user = await User.get_or_none(username=username)
|
|
if user and verify_password(password, user.hashed_password):
|
|
return user
|
|
except (ValueError, UnicodeDecodeError, base64.binascii.Error):
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
async def webdav_auth(request: Request, authorization: Optional[str] = Header(None)):
|
|
# First, try Basic Auth, which is common for WebDAV clients
|
|
user = await basic_auth(authorization)
|
|
if user:
|
|
return user
|
|
|
|
# If Basic Auth fails or is not provided, try to authenticate using the session cookie
|
|
token = request.cookies.get("access_token")
|
|
if token:
|
|
try:
|
|
payload = jwt.decode(
|
|
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
username: str = payload.get("sub")
|
|
if username:
|
|
user = await User.get_or_none(username=username)
|
|
if user:
|
|
return user
|
|
except JWTError:
|
|
# Token is invalid, fall through to the final exception
|
|
pass
|
|
|
|
# If all authentication methods fail, raise 401
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
headers={"WWW-Authenticate": 'Basic realm="MyWebdav WebDAV"'},
|
|
)
|
|
|
|
|
|
async def resolve_path(path_str: str, user: User):
|
|
"""
|
|
Resolves a path string to a resource.
|
|
Returns a tuple: (resource, parent_folder, exists)
|
|
- resource: The File or Folder object at the path, or None if not found.
|
|
- parent_folder: The parent Folder object, or None if root.
|
|
- exists: Boolean indicating if the resource at the given path exists.
|
|
"""
|
|
path_str = path_str.strip("/")
|
|
if not path_str: # Root directory
|
|
# The root exists conceptually, but has no specific resource object.
|
|
# It contains top-level files and folders.
|
|
return None, None, True
|
|
|
|
parts = [p for p in path_str.split("/") if p]
|
|
current_folder = None
|
|
# Traverse the path to find the parent of the target resource
|
|
for i, part in enumerate(parts[:-1]):
|
|
folder = await Folder.get_or_none(
|
|
name=part, parent=current_folder, owner=user, is_deleted=False
|
|
)
|
|
if not folder:
|
|
# A component in the middle of the path does not exist, so the full path cannot exist.
|
|
return None, None, False
|
|
current_folder = folder
|
|
|
|
last_part = parts[-1]
|
|
|
|
# Check for the target resource itself (can be a folder or a file)
|
|
folder = await Folder.get_or_none(
|
|
name=last_part, parent=current_folder, owner=user, is_deleted=False
|
|
)
|
|
if folder:
|
|
return folder, current_folder, True
|
|
|
|
file = await File.get_or_none(
|
|
name=last_part, parent=current_folder, owner=user, is_deleted=False
|
|
)
|
|
if file:
|
|
return file, current_folder, True
|
|
|
|
# The resource itself was not found, but the path to its parent is valid.
|
|
return None, current_folder, False
|
|
|
|
|
|
def build_href(base_path: str, name: str, is_collection: bool):
|
|
path = f"{base_path.rstrip('/')}/{name}"
|
|
if is_collection:
|
|
path += "/"
|
|
return path
|
|
|
|
|
|
async def get_custom_properties(resource_type: str, resource_id: int):
|
|
props = await WebDAVProperty.filter(
|
|
resource_type=resource_type, resource_id=resource_id
|
|
)
|
|
return {(prop.namespace, prop.name): prop.value for prop in props}
|
|
|
|
|
|
def create_propstat_element(
|
|
props: dict, custom_props: dict = None, status: str = "HTTP/1.1 200 OK"
|
|
):
|
|
propstat = ET.Element("D:propstat")
|
|
prop = ET.SubElement(propstat, "D:prop")
|
|
|
|
for key, value in props.items():
|
|
if key == "resourcetype":
|
|
resourcetype = ET.SubElement(prop, "D:resourcetype")
|
|
if value == "collection":
|
|
ET.SubElement(resourcetype, "D:collection")
|
|
elif key == "getcontentlength":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = str(value)
|
|
elif key == "getcontenttype":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = value
|
|
elif key == "getlastmodified":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = value.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
|
elif key == "creationdate":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = value.isoformat() + "Z"
|
|
elif key == "displayname":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = value
|
|
elif key == "getetag":
|
|
elem = ET.SubElement(prop, f"D:{key}")
|
|
elem.text = value
|
|
|
|
if custom_props:
|
|
for (namespace, name), value in custom_props.items():
|
|
if namespace == "DAV:":
|
|
continue
|
|
elem = ET.SubElement(prop, f"{{{namespace}}}{name}")
|
|
elem.text = value
|
|
|
|
status_elem = ET.SubElement(propstat, "D:status")
|
|
status_elem.text = status
|
|
|
|
return propstat
|
|
|
|
|
|
def parse_propfind_body(body: bytes):
|
|
if not body:
|
|
return None
|
|
|
|
try:
|
|
root = ET.fromstring(body)
|
|
|
|
allprop = root.find(".//{DAV:}allprop")
|
|
if allprop is not None:
|
|
return "allprop"
|
|
|
|
propname = root.find(".//{DAV:}propname")
|
|
if propname is not None:
|
|
return "propname"
|
|
|
|
prop = root.find(".//{DAV:}prop")
|
|
if prop is not None:
|
|
requested_props = []
|
|
for child in prop:
|
|
ns = child.tag.split("}")[0][1:] if "}" in child.tag else "DAV:"
|
|
name = child.tag.split("}")[1] if "}" in child.tag else child.tag
|
|
requested_props.append((ns, name))
|
|
return requested_props
|
|
except ET.ParseError:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["OPTIONS"])
|
|
async def webdav_options(full_path: str):
|
|
return Response(
|
|
status_code=200,
|
|
headers={
|
|
"DAV": "1, 2",
|
|
"Allow": "OPTIONS, GET, HEAD, POST, PUT, DELETE, PROPFIND, PROPPATCH, MKCOL, COPY, MOVE, LOCK, UNLOCK",
|
|
"MS-Author-Via": "DAV",
|
|
},
|
|
)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["PROPFIND"])
|
|
async def handle_propfind(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
depth = request.headers.get("Depth", "1")
|
|
full_path_str = unquote(full_path).strip("/")
|
|
body = await request.body()
|
|
requested_props = parse_propfind_body(body)
|
|
|
|
resource, parent_folder, exists = await resolve_path(full_path_str, current_user)
|
|
|
|
if not exists:
|
|
raise HTTPException(status_code=404, detail="Not found")
|
|
|
|
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
|
|
base_href = f"/webdav/{full_path_str}" if full_path_str else "/webdav/"
|
|
|
|
# This function adds a single resource to the multistatus response
|
|
async def add_resource_to_response(res, res_href):
|
|
response = ET.SubElement(multistatus, "D:response")
|
|
href = ET.SubElement(response, "D:href")
|
|
href.text = res_href
|
|
|
|
props = {}
|
|
custom_props = None
|
|
res_type = ""
|
|
res_id = None
|
|
|
|
if isinstance(res, Folder):
|
|
res_type, res_id = "folder", res.id
|
|
props = {
|
|
"resourcetype": "collection", "displayname": res.name,
|
|
"creationdate": res.created_at, "getlastmodified": res.updated_at,
|
|
}
|
|
elif isinstance(res, File):
|
|
res_type, res_id = "file", res.id
|
|
props = {
|
|
"resourcetype": "", "displayname": res.name,
|
|
"getcontentlength": res.size, "getcontenttype": res.mime_type,
|
|
"creationdate": res.created_at, "getlastmodified": res.updated_at,
|
|
"getetag": f'"{res.file_hash}"',
|
|
}
|
|
elif res is None and (full_path_str == "" or isinstance(resource, Folder)): # Root or empty folder
|
|
props = {
|
|
"resourcetype": "collection", "displayname": resource.name if resource else "Root",
|
|
"creationdate": resource.created_at if resource else datetime.now(),
|
|
"getlastmodified": resource.updated_at if resource else datetime.now(),
|
|
}
|
|
|
|
if res_type and (requested_props == "allprop" or (isinstance(requested_props, list) and any(ns != "DAV:" for ns, _ in requested_props))):
|
|
custom_props = await get_custom_properties(res_type, res_id)
|
|
|
|
response.append(create_propstat_element(props, custom_props))
|
|
|
|
# Add the main resource itself to the response
|
|
res_href = base_href if isinstance(resource, File) else (base_href if base_href.endswith('/') else base_href + '/')
|
|
await add_resource_to_response(resource, res_href)
|
|
|
|
# If depth is 1 or infinity, add children
|
|
if depth in ["1", "infinity"]:
|
|
target_folder = resource if isinstance(resource, Folder) else (None if full_path_str == "" else parent_folder)
|
|
|
|
folders = await Folder.filter(owner=current_user, parent=target_folder, is_deleted=False)
|
|
files = await File.filter(owner=current_user, parent=target_folder, is_deleted=False)
|
|
|
|
for folder in folders:
|
|
child_href = build_href(base_href, folder.name, True)
|
|
await add_resource_to_response(folder, child_href)
|
|
for file in files:
|
|
child_href = build_href(base_href, file.name, False)
|
|
await add_resource_to_response(file, child_href)
|
|
|
|
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
|
|
return Response(content=xml_content, media_type="application/xml; charset=utf-8", status_code=207)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["GET", "HEAD"])
|
|
async def handle_get(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
resource, _, exists = await resolve_path(full_path, current_user)
|
|
|
|
if not exists or not isinstance(resource, File):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
try:
|
|
if request.method == "HEAD":
|
|
return Response(
|
|
status_code=200,
|
|
headers={
|
|
"Content-Length": str(resource.size),
|
|
"Content-Type": resource.mime_type,
|
|
"ETag": f'"{resource.file_hash}"',
|
|
"Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
|
|
},
|
|
)
|
|
|
|
async def file_iterator():
|
|
async for chunk in storage_manager.get_file(current_user.id, resource.path):
|
|
yield chunk
|
|
|
|
return StreamingResponse(
|
|
content=file_iterator(),
|
|
media_type=resource.mime_type,
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{resource.name}"',
|
|
"ETag": f'"{resource.file_hash}"',
|
|
"Last-Modified": resource.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
|
|
},
|
|
)
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail="File not found in storage")
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["PUT"])
|
|
async def handle_put(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
if not full_path:
|
|
raise HTTPException(status_code=400, detail="Cannot PUT to root")
|
|
|
|
parts = [p for p in full_path.split("/") if p]
|
|
file_name = parts[-1]
|
|
parent_path = "/".join(parts[:-1])
|
|
|
|
parent_resource, _, parent_exists = await resolve_path(parent_path, current_user)
|
|
|
|
if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)):
|
|
raise HTTPException(status_code=409, detail="Parent collection does not exist")
|
|
|
|
parent_folder = parent_resource if isinstance(parent_resource, Folder) else None
|
|
|
|
file_content = await request.body()
|
|
file_size = len(file_content)
|
|
|
|
if current_user.used_storage_bytes + file_size > current_user.storage_quota_bytes:
|
|
raise HTTPException(status_code=507, detail="Storage quota exceeded")
|
|
|
|
file_hash = hashlib.sha256(file_content).hexdigest()
|
|
file_extension = os.path.splitext(file_name)[1]
|
|
unique_filename = f"{file_hash}{file_extension}"
|
|
storage_path = os.path.join(str(current_user.id), unique_filename)
|
|
|
|
await storage_manager.save_file(current_user.id, storage_path, file_content)
|
|
|
|
mime_type, _ = mimetypes.guess_type(file_name)
|
|
mime_type = mime_type or "application/octet-stream"
|
|
|
|
existing_file = await File.get_or_none(
|
|
name=file_name, parent=parent_folder, owner=current_user, is_deleted=False
|
|
)
|
|
|
|
if existing_file:
|
|
old_size = existing_file.size
|
|
existing_file.path, existing_file.size, existing_file.mime_type, existing_file.file_hash = \
|
|
storage_path, file_size, mime_type, file_hash
|
|
existing_file.updated_at = datetime.now()
|
|
await existing_file.save()
|
|
current_user.used_storage_bytes += (file_size - old_size)
|
|
await current_user.save()
|
|
await log_activity(current_user, "file_updated", "file", existing_file.id)
|
|
return Response(status_code=204)
|
|
else:
|
|
db_file = await File.create(
|
|
name=file_name, path=storage_path, size=file_size, mime_type=mime_type,
|
|
file_hash=file_hash, owner=current_user, parent=parent_folder
|
|
)
|
|
current_user.used_storage_bytes += file_size
|
|
await current_user.save()
|
|
await log_activity(current_user, "file_created", "file", db_file.id)
|
|
return Response(status_code=201)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["DELETE"])
|
|
async def handle_delete(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
if not full_path:
|
|
raise HTTPException(status_code=400, detail="Cannot DELETE root")
|
|
|
|
resource, _, exists = await resolve_path(full_path, current_user)
|
|
|
|
if not exists:
|
|
raise HTTPException(status_code=404, detail="Resource not found")
|
|
|
|
if isinstance(resource, File):
|
|
resource.is_deleted = True
|
|
resource.deleted_at = datetime.now()
|
|
await resource.save()
|
|
await log_activity(current_user, "file_deleted", "file", resource.id)
|
|
elif isinstance(resource, Folder):
|
|
child_files = await File.filter(parent=resource, is_deleted=False).count()
|
|
child_folders = await Folder.filter(parent=resource, is_deleted=False).count()
|
|
if child_files > 0 or child_folders > 0:
|
|
raise HTTPException(status_code=409, detail="Folder is not empty")
|
|
|
|
resource.is_deleted = True
|
|
resource.deleted_at = datetime.now()
|
|
await resource.save()
|
|
await log_activity(current_user, "folder_deleted", "folder", resource.id)
|
|
|
|
return Response(status_code=204)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["MKCOL"])
|
|
async def handle_mkcol(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
if not full_path:
|
|
raise HTTPException(status_code=405, detail="Cannot MKCOL at root")
|
|
|
|
resource, _, exists = await resolve_path(full_path, current_user)
|
|
if exists:
|
|
raise HTTPException(status_code=405, detail="Resource already exists")
|
|
|
|
parts = [p for p in full_path.split("/") if p]
|
|
folder_name = parts[-1]
|
|
parent_path = "/".join(parts[:-1])
|
|
|
|
parent_resource, _, parent_exists = await resolve_path(parent_path, current_user)
|
|
|
|
if not parent_exists or (parent_path and not isinstance(parent_resource, Folder)):
|
|
raise HTTPException(status_code=409, detail="Parent collection does not exist")
|
|
|
|
parent_folder = parent_resource if isinstance(parent_resource, Folder) else None
|
|
|
|
folder = await Folder.create(name=folder_name, parent=parent_folder, owner=current_user)
|
|
await log_activity(current_user, "folder_created", "folder", folder.id)
|
|
return Response(status_code=201)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["COPY"])
|
|
async def handle_copy(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
destination = request.headers.get("Destination")
|
|
overwrite = request.headers.get("Overwrite", "T").upper()
|
|
|
|
if not destination:
|
|
raise HTTPException(status_code=400, detail="Destination header required")
|
|
|
|
dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/")
|
|
source_resource, _, source_exists = await resolve_path(full_path, current_user)
|
|
|
|
if not source_exists:
|
|
raise HTTPException(status_code=404, detail="Source not found")
|
|
if not isinstance(source_resource, File):
|
|
raise HTTPException(status_code=501, detail="Only file copy is implemented")
|
|
|
|
dest_name = dest_path.split("/")[-1]
|
|
dest_parent_path = "/".join(dest_path.split("/")[:-1])
|
|
|
|
dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user)
|
|
|
|
if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)):
|
|
raise HTTPException(status_code=409, detail="Destination parent collection does not exist")
|
|
|
|
dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None
|
|
|
|
existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user)
|
|
|
|
if existing_dest_exists and overwrite == "F":
|
|
raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'")
|
|
|
|
if existing_dest_exists:
|
|
# Update existing_dest instead of deleting and recreating
|
|
existing_dest.path = source_resource.path
|
|
existing_dest.size = source_resource.size
|
|
existing_dest.mime_type = source_resource.mime_type
|
|
existing_dest.file_hash = source_resource.file_hash
|
|
existing_dest.updated_at = datetime.now()
|
|
await existing_dest.save()
|
|
await log_activity(current_user, "file_copied", "file", existing_dest.id)
|
|
return Response(status_code=204)
|
|
else:
|
|
new_file = await File.create(
|
|
name=dest_name, path=source_resource.path, size=source_resource.size,
|
|
mime_type=source_resource.mime_type, file_hash=source_resource.file_hash,
|
|
owner=current_user, parent=dest_parent_folder
|
|
)
|
|
await log_activity(current_user, "file_copied", "file", new_file.id)
|
|
return Response(status_code=201)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["MOVE"])
|
|
async def handle_move(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
destination = request.headers.get("Destination")
|
|
overwrite = request.headers.get("Overwrite", "T").upper()
|
|
|
|
if not destination:
|
|
raise HTTPException(status_code=400, detail="Destination header required")
|
|
|
|
dest_path = unquote(urlparse(destination).path).replace("/webdav/", "", 1).strip("/")
|
|
source_resource, _, source_exists = await resolve_path(full_path, current_user)
|
|
|
|
if not source_exists:
|
|
raise HTTPException(status_code=404, detail="Source not found")
|
|
|
|
dest_name = dest_path.split("/")[-1]
|
|
dest_parent_path = "/".join(dest_path.split("/")[:-1])
|
|
|
|
dest_parent_resource, _, dest_parent_exists = await resolve_path(dest_parent_path, current_user)
|
|
|
|
if not dest_parent_exists or (dest_parent_path and not isinstance(dest_parent_resource, Folder)):
|
|
raise HTTPException(status_code=409, detail="Destination parent collection does not exist")
|
|
|
|
dest_parent_folder = dest_parent_resource if isinstance(dest_parent_resource, Folder) else None
|
|
|
|
existing_dest, _, existing_dest_exists = await resolve_path(dest_path, current_user)
|
|
|
|
if existing_dest_exists and overwrite == "F":
|
|
raise HTTPException(status_code=412, detail="Destination exists and Overwrite is 'F'")
|
|
|
|
# Handle overwrite scenario
|
|
if existing_dest_exists:
|
|
if isinstance(source_resource, File):
|
|
# Update existing_dest with source_resource's properties
|
|
existing_dest.name = dest_name
|
|
existing_dest.path = source_resource.path
|
|
existing_dest.size = source_resource.size
|
|
existing_dest.mime_type = source_resource.mime_type
|
|
existing_dest.file_hash = source_resource.file_hash
|
|
existing_dest.parent = dest_parent_folder
|
|
existing_dest.updated_at = datetime.now()
|
|
await existing_dest.save()
|
|
await log_activity(current_user, "file_moved_overwrite", "file", existing_dest.id)
|
|
elif isinstance(source_resource, Folder):
|
|
raise HTTPException(status_code=501, detail="Folder move overwrite not implemented")
|
|
|
|
# Mark source as deleted
|
|
source_resource.is_deleted = True
|
|
source_resource.deleted_at = datetime.now()
|
|
await source_resource.save()
|
|
await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id)
|
|
|
|
return Response(status_code=204) # Overwrite means 204 No Content
|
|
|
|
# Handle non-overwrite scenario (create new resource at destination)
|
|
else:
|
|
if isinstance(source_resource, File):
|
|
new_file = await File.create(
|
|
name=dest_name, path=source_resource.path, size=source_resource.size,
|
|
mime_type=source_resource.mime_type, file_hash=source_resource.file_hash,
|
|
owner=current_user, parent=dest_parent_folder
|
|
)
|
|
await log_activity(current_user, "file_moved_created", "file", new_file.id)
|
|
elif isinstance(source_resource, Folder):
|
|
raise HTTPException(status_code=501, detail="Folder move not implemented")
|
|
|
|
# Mark source as deleted
|
|
source_resource.is_deleted = True
|
|
source_resource.deleted_at = datetime.now()
|
|
await source_resource.save()
|
|
await log_activity(current_user, "file_deleted_after_move", "file", source_resource.id)
|
|
|
|
return Response(status_code=201) # New resource created means 201 Created
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["LOCK"])
|
|
async def handle_lock(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
|
|
timeout_header = request.headers.get("Timeout", "Second-3600")
|
|
timeout = 3600
|
|
if timeout_header.startswith("Second-"):
|
|
try:
|
|
timeout = int(timeout_header.split("-")[1])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
lock_token = WebDAVLock.create_lock(full_path, current_user.id, timeout)
|
|
|
|
lockinfo = ET.Element("D:prop", {"xmlns:D": "DAV:"})
|
|
lockdiscovery = ET.SubElement(lockinfo, "D:lockdiscovery")
|
|
activelock = ET.SubElement(lockdiscovery, "D:activelock")
|
|
|
|
locktype = ET.SubElement(activelock, "D:locktype")
|
|
ET.SubElement(locktype, "D:write")
|
|
|
|
lockscope = ET.SubElement(activelock, "D:lockscope")
|
|
ET.SubElement(lockscope, "D:exclusive")
|
|
|
|
depth_elem = ET.SubElement(activelock, "D:depth")
|
|
depth_elem.text = "0"
|
|
|
|
owner = ET.SubElement(activelock, "D:owner")
|
|
owner_href = ET.SubElement(owner, "D:href")
|
|
owner_href.text = current_user.username
|
|
|
|
timeout_elem = ET.SubElement(activelock, "D:timeout")
|
|
timeout_elem.text = f"Second-{timeout}"
|
|
|
|
locktoken_elem = ET.SubElement(activelock, "D:locktoken")
|
|
href = ET.SubElement(locktoken_elem, "D:href")
|
|
href.text = lock_token
|
|
|
|
xml_content = ET.tostring(lockinfo, encoding="utf-8", xml_declaration=True)
|
|
|
|
return Response(
|
|
content=xml_content,
|
|
media_type="application/xml; charset=utf-8",
|
|
status_code=200,
|
|
headers={"Lock-Token": f"<{lock_token}>"},
|
|
)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["UNLOCK"])
|
|
async def handle_unlock(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
lock_token_header = request.headers.get("Lock-Token")
|
|
|
|
if not lock_token_header:
|
|
raise HTTPException(status_code=400, detail="Lock-Token header required")
|
|
|
|
lock_token = lock_token_header.strip("<>")
|
|
existing_lock = WebDAVLock.get_lock(full_path)
|
|
|
|
if not existing_lock or existing_lock["token"] != lock_token:
|
|
raise HTTPException(status_code=409, detail="Invalid lock token")
|
|
|
|
if existing_lock["user_id"] != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Not lock owner")
|
|
|
|
WebDAVLock.remove_lock(full_path)
|
|
return Response(status_code=204)
|
|
|
|
|
|
@router.api_route("/{full_path:path}", methods=["PROPPATCH"])
|
|
async def handle_proppatch(
|
|
request: Request, full_path: str, current_user: User = Depends(webdav_auth)
|
|
):
|
|
full_path = unquote(full_path).strip("/")
|
|
|
|
resource, parent, exists = await resolve_path(full_path, current_user)
|
|
|
|
if not resource:
|
|
raise HTTPException(status_code=404, detail="Resource not found")
|
|
|
|
body = await request.body()
|
|
if not body:
|
|
raise HTTPException(status_code=400, detail="Request body required")
|
|
|
|
try:
|
|
root = ET.fromstring(body)
|
|
except Exception:
|
|
raise HTTPException(status_code=400, detail="Invalid XML")
|
|
|
|
resource_type = "file" if isinstance(resource, File) else "folder"
|
|
resource_id = resource.id
|
|
|
|
set_props = []
|
|
remove_props = []
|
|
failed_props = []
|
|
|
|
set_element = root.find(".//{DAV:}set")
|
|
if set_element is not None:
|
|
prop_element = set_element.find(".//{DAV:}prop")
|
|
if prop_element is not None:
|
|
for child in prop_element:
|
|
ns = child.tag.split("}")[0][1:] if "}" in child.tag else "DAV:"
|
|
name = child.tag.split("}")[1] if "}" in child.tag else child.tag
|
|
value = child.text or ""
|
|
|
|
if ns == "DAV:":
|
|
live_props = [
|
|
"creationdate",
|
|
"getcontentlength",
|
|
"getcontenttype",
|
|
"getetag",
|
|
"getlastmodified",
|
|
"resourcetype",
|
|
]
|
|
if name in live_props:
|
|
failed_props.append((ns, name, "409 Conflict"))
|
|
continue
|
|
|
|
try:
|
|
existing_prop = await WebDAVProperty.get_or_none(
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
namespace=ns,
|
|
name=name,
|
|
)
|
|
|
|
if existing_prop:
|
|
existing_prop.value = value
|
|
await existing_prop.save()
|
|
else:
|
|
await WebDAVProperty.create(
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
namespace=ns,
|
|
name=name,
|
|
value=value,
|
|
)
|
|
set_props.append((ns, name))
|
|
except Exception:
|
|
failed_props.append((ns, name, "500 Internal Server Error"))
|
|
|
|
remove_element = root.find(".//{DAV:}remove")
|
|
if remove_element is not None:
|
|
prop_element = remove_element.find(".//{DAV:}prop")
|
|
if prop_element is not None:
|
|
for child in prop_element:
|
|
ns = child.tag.split("}")[0][1:] if "}" in child.tag else "DAV:"
|
|
name = child.tag.split("}")[1] if "}" in child.tag else child.tag
|
|
|
|
if ns == "DAV:":
|
|
failed_props.append((ns, name, "409 Conflict"))
|
|
continue
|
|
|
|
try:
|
|
existing_prop = await WebDAVProperty.get_or_none(
|
|
resource_type=resource_type,
|
|
resource_id=resource_id,
|
|
namespace=ns,
|
|
name=name,
|
|
)
|
|
|
|
if existing_prop:
|
|
await existing_prop.delete()
|
|
remove_props.append((ns, name))
|
|
else:
|
|
failed_props.append((ns, name, "404 Not Found"))
|
|
except Exception:
|
|
failed_props.append((ns, name, "500 Internal Server Error"))
|
|
|
|
multistatus = ET.Element("D:multistatus", {"xmlns:D": "DAV:"})
|
|
response_elem = ET.SubElement(multistatus, "D:response")
|
|
href = ET.SubElement(response_elem, "D:href")
|
|
href.text = f"/webdav/{full_path}"
|
|
|
|
if set_props or remove_props:
|
|
propstat = ET.SubElement(response_elem, "D:propstat")
|
|
prop = ET.SubElement(propstat, "D:prop")
|
|
|
|
for ns, name in set_props + remove_props:
|
|
if ns == "DAV:":
|
|
ET.SubElement(prop, f"D:{name}")
|
|
else:
|
|
ET.SubElement(prop, f"{{{ns}}}{name}")
|
|
|
|
status_elem = ET.SubElement(propstat, "D:status")
|
|
status_elem.text = "HTTP/1.1 200 OK"
|
|
|
|
if failed_props:
|
|
prop_by_status = {}
|
|
for ns, name, status_text in failed_props:
|
|
if status_text not in prop_by_status:
|
|
prop_by_status[status_text] = []
|
|
prop_by_status[status_text].append((ns, name))
|
|
|
|
for status_text, props_list in prop_by_status.items():
|
|
propstat = ET.SubElement(response_elem, "D:propstat")
|
|
prop = ET.SubElement(propstat, "D:prop")
|
|
|
|
for ns, name in props_list:
|
|
if ns == "DAV:":
|
|
ET.SubElement(prop, f"D:{name}")
|
|
else:
|
|
ET.SubElement(prop, f"{{{ns}}}{name}")
|
|
|
|
status_elem = ET.SubElement(propstat, "D:status")
|
|
status_elem.text = f"HTTP/1.1 {status_text}"
|
|
|
|
await log_activity(current_user, "properties_modified", resource_type, resource_id)
|
|
|
|
xml_content = ET.tostring(multistatus, encoding="utf-8", xml_declaration=True)
|
|
return Response(
|
|
content=xml_content,
|
|
media_type="application/xml; charset=utf-8",
|
|
status_code=207,
|
|
)
|