|
# Written by retoor@molodetz.nl
|
|
|
|
# This code defines a web application for uploading and retrieving files.
|
|
# It includes functionality to upload files through a POST request and retrieve them via a GET request.
|
|
|
|
# The code uses the following non-standard imports:
|
|
# - snek.system.view.BaseView: For extending view functionalities.
|
|
# - aiofiles: For asynchronous file operations.
|
|
# - aiohttp: For managing web server requests and responses.
|
|
|
|
# MIT License: This software is licensed under the MIT License, a permissive free software license.
|
|
|
|
from snek.system.view import BaseView
|
|
import aiofiles
|
|
import pathlib
|
|
from aiohttp import web
|
|
import uuid
|
|
|
|
UPLOAD_DIR = pathlib.Path("./drive")
|
|
|
|
class UploadView(BaseView):
|
|
|
|
async def get(self):
|
|
uid = self.request.match_info.get("uid")
|
|
drive_item = await self.services.drive_item.get(uid)
|
|
response = web.FileResponse(drive_item["path"])
|
|
response.headers['Cache-Control'] = f'public, max-age={1337*420}'
|
|
return response
|
|
|
|
async def post(self):
|
|
reader = await self.request.multipart()
|
|
files = []
|
|
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
channel_uid = None
|
|
|
|
drive = await self.services.drive.get_or_create(user_uid=self.request.session.get("uid"))
|
|
|
|
extension_types = {
|
|
".jpg": "image",
|
|
".gif": "image",
|
|
".png": "image",
|
|
".jpeg": "image",
|
|
".mp4": "video",
|
|
".mp3": "audio",
|
|
".pdf": "document",
|
|
".doc": "document",
|
|
".docx": "document"
|
|
}
|
|
|
|
while field := await reader.next():
|
|
if field.name == "channel_uid":
|
|
channel_uid = await field.text()
|
|
continue
|
|
|
|
filename = field.filename
|
|
if not filename:
|
|
continue
|
|
|
|
file_path = pathlib.Path(UPLOAD_DIR).joinpath(filename.strip("/").strip("."))
|
|
files.append(file_path)
|
|
|
|
async with aiofiles.open(str(file_path.absolute()), 'wb') as f:
|
|
while chunk := await field.read_chunk():
|
|
await f.write(chunk)
|
|
|
|
drive_item = await self.services.drive_item.create(
|
|
drive["uid"], filename, str(file_path.absolute()), file_path.stat().st_size, file_path.suffix
|
|
)
|
|
|
|
type_ = "unknown"
|
|
extension = "." + filename.split(".")[-1]
|
|
if extension in extension_types:
|
|
type_ = extension_types[extension]
|
|
|
|
await self.services.drive_item.save(drive_item)
|
|
response = "Uploaded [" + filename + "](/drive.bin/" + drive_item["uid"] + ")"
|
|
#response = "<iframe width=\"100%\" frameborder=\"0\" allowfullscreen title=\"Embedded\" src=\"" + self.base_url + "/drive.bin/" + drive_item["uid"] + "\"></iframe>\n"
|
|
response = "[" + filename + "](/drive.bin/" + drive_item["uid"] + extension + ")"
|
|
|
|
await self.services.chat.send(
|
|
self.request.session.get("uid"), channel_uid, response
|
|
)
|
|
|
|
return web.json_response({"message": "Files uploaded successfully", "files": [str(file) for file in files], "channel_uid": channel_uid})
|