|
#!/usr/bin/env python3
|
|
"""
|
|
@fileoverview CORS proxy server for Rantii
|
|
@author retoor <retoor@molodetz.nl>
|
|
"""
|
|
|
|
import os
|
|
import asyncio
|
|
import mimetypes
|
|
import posixpath
|
|
from urllib.parse import urlparse, unquote
|
|
from aiohttp import web, ClientSession
|
|
|
|
API_BASE = 'https://dr.molodetz.nl/api/'
|
|
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
PORT = 8101
|
|
|
|
def add_cors_headers(response):
|
|
response.headers['Access-Control-Allow-Origin'] = '*'
|
|
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, DELETE, OPTIONS'
|
|
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
|
|
return response
|
|
|
|
def is_path_safe(path):
|
|
normalized = posixpath.normpath(path)
|
|
if '..' in normalized:
|
|
return False
|
|
full_path = os.path.abspath(os.path.join(ROOT_DIR, normalized.lstrip('/')))
|
|
return full_path.startswith(ROOT_DIR)
|
|
|
|
async def handle_options(request):
|
|
return add_cors_headers(web.Response(status=200))
|
|
|
|
async def proxy_request(request, method, max_retries=10, retry_delay=2):
|
|
path = request.path[5:]
|
|
parsed = urlparse(path)
|
|
clean_path = posixpath.normpath(parsed.path).lstrip('/')
|
|
|
|
if '..' in clean_path:
|
|
response = web.json_response({'success': False, 'error': 'Invalid path'}, status=400)
|
|
return add_cors_headers(response)
|
|
|
|
url = API_BASE + clean_path
|
|
if request.query_string:
|
|
url += '?' + request.query_string
|
|
|
|
body = None
|
|
if method in ('POST', 'DELETE'):
|
|
body = await request.read()
|
|
|
|
headers = {}
|
|
if 'Content-Type' in request.headers:
|
|
headers['Content-Type'] = request.headers['Content-Type']
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with session.request(method, url, data=body, headers=headers) as resp:
|
|
data = await resp.read()
|
|
response = web.Response(
|
|
body=data,
|
|
status=resp.status,
|
|
content_type=resp.content_type
|
|
)
|
|
return add_cors_headers(response)
|
|
except Exception:
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(retry_delay)
|
|
|
|
response = web.json_response({'success': False, 'error': 'Connection failed'}, status=503)
|
|
return add_cors_headers(response)
|
|
|
|
ALLOWED_IMAGE_HOSTS = [
|
|
'img.devrant.com',
|
|
'avatars.devrant.com',
|
|
'img.youtube.com',
|
|
'i.ytimg.com',
|
|
]
|
|
|
|
async def handle_image_proxy(request):
|
|
url = request.query.get('url')
|
|
if not url:
|
|
return add_cors_headers(web.Response(status=400, text='Missing url parameter'))
|
|
|
|
try:
|
|
parsed = urlparse(url)
|
|
if parsed.hostname not in ALLOWED_IMAGE_HOSTS:
|
|
return add_cors_headers(web.Response(status=403, text='Host not allowed'))
|
|
except Exception:
|
|
return add_cors_headers(web.Response(status=400, text='Invalid URL'))
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
async with ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
if resp.status != 200:
|
|
return add_cors_headers(web.Response(status=resp.status))
|
|
data = await resp.read()
|
|
content_type = resp.content_type or 'image/png'
|
|
response = web.Response(body=data, content_type=content_type)
|
|
response.headers['Cache-Control'] = 'public, max-age=86400'
|
|
return add_cors_headers(response)
|
|
except Exception:
|
|
if attempt < 2:
|
|
await asyncio.sleep(1)
|
|
|
|
return add_cors_headers(web.Response(status=503, text='Failed to fetch image'))
|
|
|
|
async def handle_api_get(request):
|
|
if request.path == '/api/proxy-image':
|
|
return await handle_image_proxy(request)
|
|
return await proxy_request(request, 'GET')
|
|
|
|
async def handle_api_post(request):
|
|
return await proxy_request(request, 'POST')
|
|
|
|
async def handle_api_delete(request):
|
|
return await proxy_request(request, 'DELETE')
|
|
|
|
async def handle_static(request):
|
|
path = request.path
|
|
|
|
if path == '/' or path.startswith('/?'):
|
|
path = '/index.html'
|
|
|
|
path = unquote(path).lstrip('/')
|
|
|
|
if not is_path_safe(path):
|
|
return add_cors_headers(web.Response(status=403, text='Forbidden'))
|
|
|
|
full_path = os.path.join(ROOT_DIR, path)
|
|
|
|
if os.path.isdir(full_path):
|
|
full_path = os.path.join(full_path, 'index.html')
|
|
|
|
if not os.path.isfile(full_path):
|
|
return add_cors_headers(web.Response(status=404, text='Not Found'))
|
|
|
|
content_type, _ = mimetypes.guess_type(full_path)
|
|
if content_type is None:
|
|
content_type = 'application/octet-stream'
|
|
|
|
with open(full_path, 'rb') as f:
|
|
data = f.read()
|
|
|
|
response = web.Response(body=data, content_type=content_type)
|
|
return add_cors_headers(response)
|
|
|
|
def create_app():
|
|
app = web.Application()
|
|
app.router.add_route('OPTIONS', '/{path:.*}', handle_options)
|
|
app.router.add_get('/api/{path:.*}', handle_api_get)
|
|
app.router.add_post('/api/{path:.*}', handle_api_post)
|
|
app.router.add_delete('/api/{path:.*}', handle_api_delete)
|
|
app.router.add_get('/{path:.*}', handle_static)
|
|
app.router.add_get('/', handle_static)
|
|
return app
|
|
|
|
if __name__ == '__main__':
|
|
print(f'Server running at http://localhost:{PORT}')
|
|
print('API proxy at /api/*')
|
|
app = create_app()
|
|
web.run_app(app, host='localhost', port=PORT, print=None)
|