import pathlib
from aiohttp import web
MAX_FILE_SIZE = 1024 * 1024 * 50 # 50Mb
UPLOAD_FOLDER_QUOTA = 10 * 1024 * 1024 * 1024 # 10Gb
UPLOAD_URL = "/"
UPLOAD_PATH = "uploads"
class Rupload(web.Application):
def __init__(
self,
upload_url: str = UPLOAD_URL,
upload_path: str = UPLOAD_PATH,
max_file_size: int = MAX_FILE_SIZE,
upload_folder_quota: int = UPLOAD_FOLDER_QUOTA,
):
self.upload_path = upload_path.rstrip("/")
self.max_file_size = max_file_size
self.upload_url = upload_url.rstrip("/")
self.upload_folder_quota = upload_folder_quota
super().__init__()
@property
def is_upload_folder_quota_reached(self):
return pathlib.Path(self.upload_path).stat().st_size > self.upload_folder_quota
def generate_upload_url(self, filename):
return self.upload_url + "/" + filename
UPLOAD_PAGE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=0.9">
<title>RUpload 1.33.7</title>
<style>
body {
font-family: "Courier New", Courier, monospace;
background-color: #111111;
display: flex;
justify-content: center;
align-items: center;
word-break: break-all;
margin: 0;
filter: invert(1);
}
a {
text-decoration: none;
}
.container {
background-color: #fff;
border-radius: 8px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
padding: 20px;
width: 100%;
max-width: 500px;
}
.container-images {
filter: invert(1);
}
h1 {
font-size: 24px;
margin-bottom: 20px;
color: #333;
}
h2 {
font-size: 20px;
margin-top: 20px;
margin-bottom: 20px;
color: #333;
clear: both;
}
form {
display: flex;
flex-direction: column;
gap: 15px;
}
input[type="file"] {
padding: 10px;
border: 2px solid #ccc;
border-radius: 5px;
font-size: 16px;
}
button {
padding: 10px 20px;
background-color: #007bff;
color: white;
border: none;
border-radius: 5px;
font-size: 16px;
cursor: pointer;
transition: background-color 0.3s;
}
button:hover {
background-color: #0056b3;
}
.message {
margin-top: 20px;
font-size: 18px;
color: #333;
}
.footer {
margin-top: 30px;
font-size: 14px;
color: #777;
}
.thumbnail {
width: 100px;
height: 100px;
object-fit: cover;
margin: 3px;
float:left;
border-radius: 5px;
}
</style>
</head>
<body>
<div class="container">
<h1>Upload file</h1>
<div class="message">
[message]
</div>
<form action="/upload" method="POST" enctype="multipart/form-data">
<input type="file" name="file" id="file" required>
<button type="submit">Upload</button>
</form>
<br />
Click <a href="#files">here</a> to see <a href="#files">uploaded files</a> (non-image) at the bottom of this page.
<h2>Uploaded images:</h2>
<div class="container-images">
[images_html]
</div>
<h2 id="files">Uploaded files:</h2>
<div class="container-file">
[files_html]
</div>
<div class="footer">
&copy; 2024 Retoor
</div>
</div>
</body>
</html>
"""
def format_size(size):
if size < 1024:
return f"{size} B"
elif size < 1024 * 1024:
return f"{size / 1024:.2f} KB"
elif size < 1024 * 1024 * 1024:
return f"{size / 1024*1024:.2f} MB"
elif size < 1024 * 1024 * 1024:
return f"{size / 1024 * 1024 * 1024:.2f} GB"
else:
return f"{size / 1024 * 1024 * 1024 * 1024:.2f} TB"
def get_images(path):
images = []
for image in pathlib.Path(path).iterdir():
if image.is_file() and image.suffix in [
".png",
".jpg",
".gif",
".jpeg",
".bmp",
".webp",
".svg",
]:
images.append(image)
return images
def get_files(path):
images = get_images(path)
files = []
for file in pathlib.Path(path).iterdir():
if file.is_file() and file not in images:
files.append(file)
return files
def create_images_html(url, image_paths):
images_html = ""
for image_path in image_paths:
path = url.rstrip("/") + "/" + image_path.name
images_html += f'<a href="{path}"><img class="thumbnail" src="{path}" alt="{image_path.name}" /></a>\n'
return images_html
def create_files_html(url, file_paths):
files_html = ""
for file_path in file_paths:
path = url.rstrip("/") + "/" + file_path.name
files_html += f'<a href="{path}">{file_path.name}</a> ({format_size(file_path.stat().st_size)})\n<br>\n'
return files_html
async def handle_upload(request: web.Request):
if request.app.is_upload_folder_quota_reached:
return web.HTTPFound(
"/?message=Server%20reached%20quota!%20Contact%20administrator."
)
reader = await request.multipart()
field = await reader.next()
app = request.app
if field.name == "file":
filename = field.filename
print(f"Attempting to upload {filename}.")
if "/" in filename:
print(f"Invalid filename: {filename}.")
return web.Response(status=400, text="Invalid filename.")
filepath = pathlib.Path(app.upload_path).joinpath(filename)
if filepath.exists():
print(f"File {filename} already exists.")
return web.HTTPFound("/?message=File%20already%20exists.")
pathlib.Path(app.upload_path).mkdir(parents=True, exist_ok=True)
with filepath.open("wb") as f:
file_size = 0
while True:
chunk = await field.read_chunk()
if not chunk:
break
file_size += len(chunk)
if file_size > app.max_file_size:
print(f"File is too large: {file_size} bytes.")
print(f"Maximum file size is {app.max_file_size} bytes.")
print(f"Deleting file {filename}.")
f.close()
f.unlink()
print(f"File {filename} deleted.")
return web.Response(
status=413,
text=f"File is too large. Maximum file size is {app.max_file_size} bytes.",
)
f.write(chunk)
print(f"File {filename} uploaded successfully.")
uploaded_url = request.app.generate_upload_url(filename)
print(f"File {filename} is now available at: {uploaded_url}.")
return web.HTTPFound(
"/?message=File is succesfully uploaded and is available here: "
+ uploaded_url
)
print("No file uploaded.")
return web.Response(status=400, text="No file uploaded.")
async def handle_index(request: web.Request):
image_paths = get_images(request.app.upload_path)
images_html = create_images_html(
url=request.app.upload_url, image_paths=image_paths
)
file_paths = get_files(request.app.upload_path)
files_html = create_files_html(url=request.app.upload_url, file_paths=file_paths)
html_content = UPLOAD_PAGE.replace("[images_html]", images_html)
html_content = html_content.replace("[files_html]", files_html)
message = request.query.get("message", "")
if request.app.is_upload_folder_quota_reached:
message = "Server reached quota! Contact administrator."
if "is available here: " in message:
url = message[message.find("here: ") + len("here: ") :]
message = message[: message.find("here: ") + len("here: ")]
message += f'<a href="{url}">'
message += f"{url}</a>"
if not message:
message = (
"Any file type is allowed thus also binary/executable or source files."
)
if message:
message += "<br /><br />"
html_content = html_content.replace("[message]", message)
return web.Response(text=html_content, content_type="text/html")
def create_app(
upload_url: str = UPLOAD_URL,
upload_path: str = UPLOAD_PATH,
max_file_size: int = MAX_FILE_SIZE,
upload_folder_quota: int = UPLOAD_FOLDER_QUOTA,
):
app = Rupload(
upload_url=upload_url,
upload_path=upload_path,
max_file_size=max_file_size,
upload_folder_quota=upload_folder_quota,
)
pathlib.Path(upload_path).mkdir(parents=True, exist_ok=True)
app.add_routes(
[
web.get("/", handle_index),
web.post("/upload", handle_upload),
web.static("/", upload_path),
web.static("/uploads", upload_path),
]
)
return app