from fastapi import FastAPI, HTTPException, Depends, Form, File, UploadFile, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, EmailStr
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime, timedelta
import hashlib
import secrets
import json
import os
from pathlib import Path
import asyncio
from ads import AsyncDataSet
app = FastAPI(title="Rant Community API")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database setup
DB_PATH = "rant_community.db"
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
# Template setup
templates = Jinja2Templates(directory="templates")
# Initialize AsyncDataSet
db = AsyncDataSet(DB_PATH)
async def init_db():
# Users table
await db.query_raw("""
CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
score INTEGER DEFAULT 0,
about TEXT DEFAULT '',
location TEXT DEFAULT '',
skills TEXT DEFAULT '',
github TEXT DEFAULT '',
website TEXT DEFAULT '',
created_time INTEGER NOT NULL,
avatar_b TEXT DEFAULT '7bc8a4',
avatar_i TEXT,
updated_at TEXT,
uid TEXT,
created_at TEXT,
deleted_at TEXT
);
""")
await db.query_raw("""
CREATE TABLE auth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token_key TEXT UNIQUE NOT NULL,
expire_time INTEGER NOT NULL,
uid TEXT,
created_at TEXT,
updated_at TEXT,
deleted_at TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
);
""")
await db.query_raw("""
CREATE TABLE rants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
text TEXT NOT NULL,
score INTEGER DEFAULT 0,
created_time INTEGER NOT NULL,
attached_image TEXT DEFAULT '',
tags TEXT DEFAULT '',
edited BOOLEAN DEFAULT 0,
type INTEGER DEFAULT 1,
updated_at TEXT,
uid TEXT,
created_at TEXT,
deleted_at TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
);
""")
await db.query_raw("""
CREATE TABLE comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rant_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
body TEXT NOT NULL,
score INTEGER DEFAULT 0,
created_time INTEGER NOT NULL,
attached_image TEXT DEFAULT '',
edited BOOLEAN DEFAULT 0,
uid TEXT,
created_at TEXT,
updated_at TEXT,
deleted_at TEXT,
FOREIGN KEY (rant_id) REFERENCES rants (id),
FOREIGN KEY (user_id) REFERENCES users (id)
);
""")
await db.query_raw("""
CREATE TABLE votes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
target_id INTEGER NOT NULL,
target_type TEXT NOT NULL,
vote INTEGER NOT NULL,
reason INTEGER,
uid TEXT,
created_at TEXT,
updated_at TEXT,
deleted_at TEXT,
UNIQUE(user_id, target_id, target_type),
FOREIGN KEY (user_id) REFERENCES users (id)
);
""")
await db.query_raw("""
CREATE TABLE favorites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
rant_id INTEGER NOT NULL,
uid TEXT,
created_at TEXT,
updated_at TEXT,
deleted_at TEXT,
UNIQUE(user_id, rant_id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (rant_id) REFERENCES rants (id)
);
""")
await db.query_raw("""
CREATE TABLE notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
type TEXT NOT NULL,
rant_id INTEGER,
comment_id INTEGER,
from_user_id INTEGER,
created_time INTEGER NOT NULL,
read BOOLEAN DEFAULT 0,
uid TEXT,
created_at TEXT,
updated_at TEXT,
deleted_at TEXT,
FOREIGN KEY (user_id) REFERENCES users (id)
);
""")
# Run init_db on startup
@app.on_event("startup")
async def startup_event():
await init_db()
# Create necessary directories
Path("templates").mkdir(exist_ok=True)
Path("templates/components").mkdir(exist_ok=True)
Path("static").mkdir(exist_ok=True)
# Pydantic models
class UserRegister(BaseModel):
email: EmailStr
username: str
password: str
class UserLogin(BaseModel):
username: str
password: str
class RantCreate(BaseModel):
rant: str
tags: str
type: int = 1
class CommentCreate(BaseModel):
comment: str
class VoteRequest(BaseModel):
vote: Literal[-1, 0, 1]
reason: Optional[int] = None
# Helper functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def generate_token() -> str:
return secrets.token_urlsafe(32)
def format_time(timestamp):
date = datetime.fromtimestamp(timestamp)
now = datetime.now()
diff = (now - date).total_seconds()
if diff < 60: return 'just now'
if diff < 3600: return f'{int(diff / 60)}m ago'
if diff < 86400: return f'{int(diff / 3600)}h ago'
if diff < 604800: return f'{int(diff / 86400)}d ago'
return date.strftime('%Y-%m-%d')
templates.env.globals['format_time'] = format_time
# Add template context processors
def escape_html(text):
return text.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace("'", '&#39;')
templates.env.globals['escape_html'] = escape_html
# Authentication helpers
async def get_current_user_from_cookie(request: Request):
"""Get current user from session cookie"""
auth_token = request.cookies.get("auth_token")
if not auth_token:
return None
try:
token_data = json.loads(auth_token)
token = await db.get("auth_tokens", {
"id": token_data['id'],
"token_key": token_data['key'],
"user_id": token_data['user_id']
})
if not token or token['expire_time'] <= int(datetime.now().timestamp()):
return None
user = await db.get("users", {"id": token_data['user_id']})
if user:
# Add token info to user object for client-side API calls
user['token_id'] = token_data['id']
user['token_key'] = token_data['key']
return user
except:
return None
async def authenticate_user(token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None):
"""Generic authentication function that works with any parameter source"""
if not all([token_id, token_key, user_id]):
return None
token = await db.get("auth_tokens", {
"id": token_id,
"token_key": token_key,
"user_id": user_id
})
if not token or token['expire_time'] <= int(datetime.now().timestamp()):
return None
return user_id
async def format_rant(rant_row, user_row, current_user_id=None):
# Get comment count
comment_count = await db.count("comments", {"rant_id": rant_row['id']})
# Get vote state for current user
vote_state = 0
if current_user_id:
vote = await db.get("votes", {
"user_id": current_user_id,
"target_id": rant_row['id'],
"target_type": "rant"
})
if vote:
vote_state = vote['vote']
tags = json.loads(rant_row['tags']) if rant_row['tags'] else []
return {
"id": rant_row['id'],
"text": rant_row['text'],
"score": rant_row['score'],
"created_time": rant_row['created_time'],
"attached_image": rant_row['attached_image'],
"num_comments": comment_count,
"tags": tags,
"vote_state": vote_state,
"edited": bool(rant_row['edited']),
"rt": rant_row['type'],
"rc": 1,
"user_id": user_row['id'],
"user_username": user_row['username'],
"user_score": user_row['score'],
"user_avatar": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
},
"user_avatar_lg": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
}
}
async def format_comment(comment_row, user_row, current_user_id=None):
# Get vote state for current user
vote_state = 0
if current_user_id:
vote = await db.get("votes", {
"user_id": current_user_id,
"target_id": comment_row['id'],
"target_type": "comment"
})
if vote:
vote_state = vote['vote']
return {
"id": comment_row['id'],
"rant_id": comment_row['rant_id'],
"body": comment_row['body'],
"score": comment_row['score'],
"created_time": comment_row['created_time'],
"vote_state": vote_state,
"user_id": user_row['id'],
"user_username": user_row['username'],
"user_score": user_row['score'],
"user_avatar": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
}
}
# SSR Routes
@app.get("/", response_class=HTMLResponse)
async def home(request: Request, sort: str = "recent"):
current_user = await get_current_user_from_cookie(request)
current_user_id = current_user['id'] if current_user else None
# Get rants
order_by = "r.created_time DESC" if sort == "recent" else "r.score DESC"
rows = await db.query_raw(
f"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
ORDER BY {order_by}
LIMIT 50 OFFSET 0""",
()
)
rants = []
for row in rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
rants.append(await format_rant(rant_data, user_data, current_user_id))
# Get notification count
notif_count = 0
if current_user:
notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0})
return templates.TemplateResponse("feed.html", {
"request": request,
"current_user": current_user,
"notif_count": notif_count,
"rants": rants,
"current_sort": sort,
"escape_html": escape_html,
"format_time": format_time
})
@app.get("/rant/{rant_id}", response_class=HTMLResponse)
async def rant_detail(request: Request, rant_id: int):
current_user = await get_current_user_from_cookie(request)
current_user_id = current_user['id'] if current_user else None
# Get rant with user info
rant_row = await db.query_one(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?""",
(rant_id,)
)
if not rant_row:
raise HTTPException(status_code=404, detail="Rant not found")
rant_data = {
'id': rant_row['id'],
'text': rant_row['text'],
'score': rant_row['score'],
'created_time': rant_row['created_time'],
'attached_image': rant_row['attached_image'],
'tags': rant_row['tags'],
'edited': rant_row['edited'],
'type': rant_row['type']
}
user_data = {
'id': rant_row['user_id'],
'username': rant_row['username'],
'score': rant_row['user_score'],
'avatar_b': rant_row['avatar_b'],
'avatar_i': rant_row['avatar_i']
}
rant = await format_rant(rant_data, user_data, current_user_id)
# Get comments
comment_rows = await db.query_raw(
"""SELECT c.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.rant_id = ?
ORDER BY c.created_time ASC""",
(rant_id,)
)
comments = []
for row in comment_rows:
comment_data = {
'id': row['id'],
'rant_id': row['rant_id'],
'body': row['body'],
'score': row['score'],
'created_time': row['created_time']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
comments.append(await format_comment(comment_data, user_data, current_user_id))
# Check if subscribed (favorited)
subscribed = 0
if current_user_id:
if await db.exists("favorites", {"user_id": current_user_id, "rant_id": rant_id}):
subscribed = 1
# Get notification count
notif_count = 0
if current_user:
notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0})
return templates.TemplateResponse("rant_detail.html", {
"request": request,
"current_user": current_user,
"notif_count": notif_count,
"rant": rant,
"comments": comments,
"subscribed": subscribed,
"escape_html": escape_html,
"format_time": format_time
})
@app.get("/profile/{user_id}", response_class=HTMLResponse)
async def profile(request: Request, user_id: int, tab: str = "rants"):
current_user = await get_current_user_from_cookie(request)
current_user_id = current_user['id'] if current_user else None
# Get user
user = await db.get("users", {"id": user_id})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Get user's rants
rant_rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.user_id = ?
ORDER BY r.created_time DESC
LIMIT 50""",
(user_id,)
)
rants = []
for row in rant_rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
rants.append(await format_rant(rant_data, user_data, current_user_id))
# Get user's comments
comment_rows = await db.query_raw(
"""SELECT c.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.user_id = ?
ORDER BY c.created_time DESC
LIMIT 50""",
(user_id,)
)
comments = []
for row in comment_rows:
comment_data = {
'id': row['id'],
'rant_id': row['rant_id'],
'body': row['body'],
'score': row['score'],
'created_time': row['created_time']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
comments.append(await format_comment(comment_data, user_data, current_user_id))
# Get favorited rants
favorite_rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
JOIN favorites f ON f.rant_id = r.id
WHERE f.user_id = ?
ORDER BY f.id DESC
LIMIT 50""",
(user_id,)
)
favorites = []
for row in favorite_rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
favorites.append(await format_rant(rant_data, user_data, current_user_id))
# Get notification count
notif_count = 0
if current_user:
notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0})
profile_data = {
"username": user['username'],
"score": user['score'],
"about": user['about'],
"location": user['location'],
"created_time": user['created_time'],
"skills": user['skills'],
"github": user['github'],
"website": user['website'],
"avatar": {
"b": user['avatar_b'],
"i": user['avatar_i'] or ""
}
}
return templates.TemplateResponse("profile.html", {
"request": request,
"current_user": current_user,
"notif_count": notif_count,
"profile": profile_data,
"profile_user_id": user_id,
"rants": rants,
"comments": comments,
"favorites": favorites,
"active_tab": tab,
"escape_html": escape_html,
"format_time": format_time
})
@app.get("/search", response_class=HTMLResponse)
async def search_page(request: Request, term: str = None):
current_user = await get_current_user_from_cookie(request)
current_user_id = current_user['id'] if current_user else None
results = []
if term:
# Search rants
rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.text LIKE ? OR r.tags LIKE ?
ORDER BY r.score DESC
LIMIT 50""",
(f'%{term}%', f'%{term}%')
)
for row in rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
results.append(await format_rant(rant_data, user_data, current_user_id))
# Get notification count
notif_count = 0
if current_user:
notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0})
return templates.TemplateResponse("search.html", {
"request": request,
"current_user": current_user,
"notif_count": notif_count,
"search_term": term,
"results": results,
"escape_html": escape_html,
"format_time": format_time
})
@app.get("/notifications", response_class=HTMLResponse)
async def notifications(request: Request):
current_user = await get_current_user_from_cookie(request)
if not current_user:
return RedirectResponse("/", status_code=302)
# Get notifications
rows = await db.query_raw(
"""SELECT n.*, u.username
FROM notifications n
LEFT JOIN users u ON n.from_user_id = u.id
WHERE n.user_id = ?
ORDER BY n.created_time DESC
LIMIT 50""",
(current_user['id'],)
)
items = []
for row in rows:
item = {
"type": row['type'],
"rant_id": row['rant_id'],
"comment_id": row['comment_id'],
"created_time": row['created_time'],
"read": row['read'],
"uid": row['from_user_id'],
"username": row['username'] or ""
}
items.append(item)
# Mark notifications as read
if rows:
await db.update("notifications", {"read": 1}, {"user_id": current_user['id']})
return templates.TemplateResponse("notifications.html", {
"request": request,
"current_user": current_user,
"notif_count": 0, # We just marked them as read
"items": items,
"format_time": format_time
})
@app.get("/classic", response_class=HTMLResponse)
async def classic():
return FileResponse("classic.html")
# Authentication endpoints with cookie support
@app.post("/login")
async def login_form(
request: Request,
response: Response,
username: str = Form(...),
password: str = Form(...)
):
# Find user by username or email
user = await db.query_one(
"SELECT * FROM users WHERE username = ? OR email = ?",
(username, username)
)
if not user or user['password_hash'] != hash_password(password):
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Invalid login credentials entered. Please try again."
})
# Create auth token
token_key = generate_token()
expire_time = int((datetime.now() + timedelta(days=30)).timestamp())
token_id = await db.insert("auth_tokens", {
"user_id": user['id'],
"token_key": token_key,
"expire_time": expire_time
}, return_id=True)
# Set cookie
auth_token = {
"id": token_id,
"key": token_key,
"expire_time": expire_time,
"user_id": user['id']
}
response = RedirectResponse("/", status_code=302)
response.set_cookie("auth_token", json.dumps(auth_token), max_age=30*24*60*60)
return response
@app.get("/logout")
async def logout(response: Response):
response = RedirectResponse("/", status_code=302)
response.delete_cookie("auth_token")
return response
# REST API Endpoints (keeping all existing endpoints)
@app.post("/api/users")
async def register_user(
email: str = Form(...),
username: str = Form(...),
password: str = Form(...),
type: int = Form(1),
app: int = Form(3)
):
# Validate username length
if len(username) < 4 or len(username) > 15:
return {
"success": False,
"error": "Your username must be between 4 and 15 characters.",
"error_field": "username"
}
# Check if username exists
if await db.exists("users", {"username": username}):
return {
"success": False,
"error": "Username already taken.",
"error_field": "username"
}
# Check if email exists
if await db.exists("users", {"email": email}):
return {
"success": False,
"error": "Email already registered.",
"error_field": "email"
}
# Create user
password_hash = hash_password(password)
created_time = int(datetime.now().timestamp())
try:
await db.insert("users", {
"username": username,
"email": email,
"password_hash": password_hash,
"created_time": created_time
}, return_id=True)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@app.post("/api/users/auth-token")
async def login(
username: str = Form(...),
password: str = Form(...),
app: int = Form(3)
):
# Find user by username or email
user = await db.query_one(
"SELECT * FROM users WHERE username = ? OR email = ?",
(username, username)
)
if not user or user['password_hash'] != hash_password(password):
return {
"success": False,
"error": "Invalid login credentials entered. Please try again."
}
# Create auth token
token_key = generate_token()
expire_time = int((datetime.now() + timedelta(days=30)).timestamp())
token_id = await db.insert("auth_tokens", {
"user_id": user['id'],
"token_key": token_key,
"expire_time": expire_time
}, return_id=True)
return {
"success": True,
"auth_token": {
"id": token_id,
"key": token_key,
"expire_time": expire_time,
"user_id": user['id']
}
}
@app.get("/api/rant/rants")
async def get_rants(
sort: str = "recent",
limit: int = 20,
skip: int = 0,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None
# Get rants with user info
order_by = "r.created_time DESC" if sort == "recent" else "r.score DESC"
rows = await db.query_raw(
f"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
ORDER BY {order_by}
LIMIT ? OFFSET ?""",
(limit, skip)
)
rants = []
for row in rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
rants.append(await format_rant(rant_data, user_data, current_user_id))
return {
"success": True,
"rants": rants,
"settings": {"notif_state": -1, "notif_token": ""},
"set": secrets.token_hex(7),
"wrw": 385,
"dpp": 0,
"num_notifs": 0,
"unread": {"total": 0},
"news": {
"id": 356,
"type": "intlink",
"headline": "Weekly Group Rant",
"body": "Share your thoughts!",
"footer": "Add tag 'wk247' to your rant",
"height": 100,
"action": "grouprant"
}
}
@app.get("/api/rant/rants/{rant_id}")
async def get_rant(
rant_id: int,
app: int = 3,
last_comment_id: Optional[int] = None,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None
# Get rant with user info
rant_row = await db.query_one(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?""",
(rant_id,)
)
if not rant_row:
raise HTTPException(status_code=404, detail="Rant not found")
rant_data = {
'id': rant_row['id'],
'text': rant_row['text'],
'score': rant_row['score'],
'created_time': rant_row['created_time'],
'attached_image': rant_row['attached_image'],
'tags': rant_row['tags'],
'edited': rant_row['edited'],
'type': rant_row['type']
}
user_data = {
'id': rant_row['user_id'],
'username': rant_row['username'],
'score': rant_row['user_score'],
'avatar_b': rant_row['avatar_b'],
'avatar_i': rant_row['avatar_i']
}
rant = await format_rant(rant_data, user_data, current_user_id)
# Get comments
comment_rows = await db.query_raw(
"""SELECT c.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.rant_id = ?
ORDER BY c.created_time ASC""",
(rant_id,)
)
comments = []
for row in comment_rows:
comment_data = {
'id': row['id'],
'rant_id': row['rant_id'],
'body': row['body'],
'score': row['score'],
'created_time': row['created_time']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
comments.append(await format_comment(comment_data, user_data, current_user_id))
# Check if subscribed (favorited)
subscribed = 0
if current_user_id:
if await db.exists("favorites", {"user_id": current_user_id, "rant_id": rant_id}):
subscribed = 1
# Add link to rant
rant['link'] = f"rants/{rant_id}/{rant['text'][:50].replace(' ', '-')}"
return {
"rant": rant,
"comments": comments,
"success": True,
"subscribed": subscribed
}
@app.post("/api/rant/rants")
async def create_rant(
rant: str = Form(...),
tags: str = Form(...),
type: int = Form(1),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...),
image: Optional[UploadFile] = File(None)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Check for duplicate rant
recent_time = int(datetime.now().timestamp()) - 300
duplicate = await db.query_one(
"SELECT id FROM rants WHERE user_id = ? AND text = ? AND created_time > ?",
(current_user_id, rant, recent_time)
)
if duplicate:
return {
"success": False,
"error": "It looks like you just posted this same rant! Your connection might have timed out while posting so you might have seen an error, but sometimes the rant still gets posted and in this case it seems it did, so please check :) If this was not the case please contact info@rant.io. Thanks!"
}
# Handle image upload
image_path = ""
if image:
file_ext = os.path.splitext(image.filename)[1]
file_name = f"{secrets.token_hex(16)}{file_ext}"
file_path = UPLOAD_DIR / file_name
with open(file_path, "wb") as f:
f.write(await image.read())
image_path = f"/uploads/{file_name}"
# Create rant
created_time = int(datetime.now().timestamp())
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
rant_id = await db.insert("rants", {
"user_id": current_user_id,
"text": rant,
"created_time": created_time,
"attached_image": image_path,
"tags": json.dumps(tags_list),
"type": type
}, return_id=True)
return {"success": True, "rant_id": rant_id}
@app.post("/api/rant/rants/{rant_id}")
async def update_rant(
rant_id: int,
rant: str = Form(...),
tags: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Check ownership
rant_row = await db.get("rants", {"id": rant_id})
if not rant_row or rant_row['user_id'] != current_user_id:
return {"success": False, "fail_reason": "Unauthorized"}
# Update rant
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
await db.update("rants", {
"text": rant,
"tags": json.dumps(tags_list),
"edited": 1
}, {"id": rant_id})
return {"success": True}
@app.delete("/api/rant/rants/{rant_id}")
async def delete_rant(
rant_id: int,
app: int = 3,
token_id: int = None,
token_key: str = None,
user_id: int = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Check ownership
rant_row = await db.get("rants", {"id": rant_id})
if not rant_row:
return {"success": False, "error": "Rant not found"}
if rant_row['user_id'] != current_user_id:
return {"success": False, "error": "Unauthorized"}
# Delete rant and related data
await db.delete("comments", {"rant_id": rant_id})
await db.delete("votes", {"target_id": rant_id, "target_type": "rant"})
await db.delete("favorites", {"rant_id": rant_id})
await db.delete("rants", {"id": rant_id})
return {"success": True}
@app.post("/api/rant/rants/{rant_id}/vote")
async def vote_rant(
rant_id: int,
vote: int = Form(...),
reason: Optional[int] = Form(None),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Get rant
rant = await db.get("rants", {"id": rant_id})
if not rant:
return {"success": False, "error": "Rant not found"}
# Check for existing vote
existing_vote = await db.get("votes", {
"user_id": current_user_id,
"target_id": rant_id,
"target_type": "rant"
})
if vote == 0:
# Remove vote
if existing_vote:
await db.delete("votes", {"id": existing_vote['id']})
# Update score
await db.update("rants", {
"score": rant['score'] - existing_vote['vote']
}, {"id": rant_id})
else:
if existing_vote:
# Update vote
score_diff = vote - existing_vote['vote']
await db.update("votes", {
"vote": vote,
"reason": reason
}, {"id": existing_vote['id']})
await db.update("rants", {
"score": rant['score'] + score_diff
}, {"id": rant_id})
else:
# New vote
await db.insert("votes", {
"user_id": current_user_id,
"target_id": rant_id,
"target_type": "rant",
"vote": vote,
"reason": reason
}, return_id=True)
await db.update("rants", {
"score": rant['score'] + vote
}, {"id": rant_id})
# Update user score
user = await db.get("users", {"id": rant['user_id']})
score_change = vote if vote != 0 else -existing_vote['vote'] if existing_vote else 0
await db.update("users", {
"score": user['score'] + score_change
}, {"id": rant['user_id']})
# Get updated rant with user info
updated_rant = await db.query_one(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?""",
(rant_id,)
)
rant_data = {
'id': updated_rant['id'],
'text': updated_rant['text'],
'score': updated_rant['score'],
'created_time': updated_rant['created_time'],
'attached_image': updated_rant['attached_image'],
'tags': updated_rant['tags'],
'edited': updated_rant['edited'],
'type': updated_rant['type']
}
user_data = {
'id': updated_rant['user_id'],
'username': updated_rant['username'],
'score': updated_rant['user_score'],
'avatar_b': updated_rant['avatar_b'],
'avatar_i': updated_rant['avatar_i']
}
return {
"success": True,
"rant": await format_rant(rant_data, user_data, current_user_id)
}
@app.post("/api/rant/rants/{rant_id}/favorite")
async def favorite_rant(
rant_id: int,
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
try:
await db.insert("favorites", {
"user_id": current_user_id,
"rant_id": rant_id
}, return_id=True)
return {"success": True}
except Exception:
return {"success": False, "error": "Already favorited"}
@app.post("/api/rant/rants/{rant_id}/unfavorite")
async def unfavorite_rant(
rant_id: int,
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
await db.delete("favorites", {
"user_id": current_user_id,
"rant_id": rant_id
})
return {"success": True}
@app.post("/api/rant/rants/{rant_id}/comments")
async def create_comment(
rant_id: int,
comment: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...),
image: Optional[UploadFile] = File(None)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "confirmed": False}
# Check if rant exists
if not await db.exists("rants", {"id": rant_id}):
return {"success": False, "error": "Rant not found"}
# Handle image upload
image_path = ""
if image:
file_ext = os.path.splitext(image.filename)[1]
file_name = f"{secrets.token_hex(16)}{file_ext}"
file_path = UPLOAD_DIR / file_name
with open(file_path, "wb") as f:
f.write(await image.read())
image_path = f"/uploads/{file_name}"
# Create comment
created_time = int(datetime.now().timestamp())
comment_id = await db.insert("comments", {
"rant_id": rant_id,
"user_id": current_user_id,
"body": comment,
"created_time": created_time,
"attached_image": image_path
}, return_id=True)
# Create notification for rant owner
rant = await db.get("rants", {"id": rant_id})
if rant and rant['user_id'] != current_user_id:
await db.insert("notifications", {
"user_id": rant['user_id'],
"type": "comment",
"rant_id": rant_id,
"comment_id": comment_id,
"from_user_id": current_user_id,
"created_time": created_time
}, return_id=True)
return {"success": True}
@app.get("/api/comments/{comment_id}")
async def get_comment(
comment_id: int,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None
row = await db.query_one(
"""SELECT c.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.id = ?""",
(comment_id,)
)
if not row:
return {"success": False, "error": "Invalid comment specified in path."}
comment_data = {
'id': row['id'],
'rant_id': row['rant_id'],
'body': row['body'],
'score': row['score'],
'created_time': row['created_time']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
return {
"success": True,
"comment": await format_comment(comment_data, user_data, current_user_id)
}
@app.post("/api/comments/{comment_id}")
async def update_comment(
comment_id: int,
comment: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Check ownership
comment_row = await db.get("comments", {"id": comment_id})
if not comment_row:
return {"success": False, "error": "Invalid comment specified in path."}
if comment_row['user_id'] != current_user_id:
return {"success": False, "fail_reason": "Unauthorized"}
# Update comment
await db.update("comments", {
"body": comment,
"edited": 1
}, {"id": comment_id})
return {"success": True}
@app.delete("/api/comments/{comment_id}")
async def delete_comment(
comment_id: int,
app: int = 3,
token_id: int = None,
token_key: str = None,
user_id: int = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Check ownership
comment_row = await db.get("comments", {"id": comment_id})
if not comment_row:
return {"success": False, "error": "Invalid comment specified in path."}
if comment_row['user_id'] != current_user_id:
return {"success": False, "error": "Unauthorized"}
# Delete comment and related data
await db.delete("votes", {"target_id": comment_id, "target_type": "comment"})
await db.delete("comments", {"id": comment_id})
return {"success": True}
@app.post("/api/comments/{comment_id}/vote")
async def vote_comment(
comment_id: int,
vote: int = Form(...),
reason: Optional[int] = Form(None),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Get comment
comment = await db.get("comments", {"id": comment_id})
if not comment:
return {"success": False, "error": "Invalid comment specified in path."}
# Check for existing vote
existing_vote = await db.get("votes", {
"user_id": current_user_id,
"target_id": comment_id,
"target_type": "comment"
})
if vote == 0:
# Remove vote
if existing_vote:
await db.delete("votes", {"id": existing_vote['id']})
# Update score
await db.update("comments", {
"score": comment['score'] - existing_vote['vote']
}, {"id": comment_id})
else:
if existing_vote:
# Update vote
score_diff = vote - existing_vote['vote']
await db.update("votes", {
"vote": vote,
"reason": reason
}, {"id": existing_vote['id']})
await db.update("comments", {
"score": comment['score'] + score_diff
}, {"id": comment_id})
else:
# New vote
await db.insert("votes", {
"user_id": current_user_id,
"target_id": comment_id,
"target_type": "comment",
"vote": vote,
"reason": reason
}, return_id=True)
await db.update("comments", {
"score": comment['score'] + vote
}, {"id": comment_id})
return {"success": True}
@app.get("/api/users/{user_id}")
async def get_profile(
user_id: int,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
auth_user_id: Optional[int] = None
):
current_user_id = await authenticate_user(token_id, token_key, auth_user_id) if token_id else None
# Get user
user = await db.get("users", {"id": user_id})
if not user:
return {"success": False, "error": "User not found"}
# Get user's rants
rant_rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.user_id = ?
ORDER BY r.created_time DESC
LIMIT 50""",
(user_id,)
)
rants = []
for row in rant_rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
rants.append(await format_rant(rant_data, user_data, current_user_id))
# Get user's comments
comment_rows = await db.query_raw(
"""SELECT c.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.user_id = ?
ORDER BY c.created_time DESC
LIMIT 50""",
(user_id,)
)
comments = []
for row in comment_rows:
comment_data = {
'id': row['id'],
'rant_id': row['rant_id'],
'body': row['body'],
'score': row['score'],
'created_time': row['created_time']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
comments.append(await format_comment(comment_data, user_data, current_user_id))
# Get favorited rants
favorite_rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
JOIN favorites f ON f.rant_id = r.id
WHERE f.user_id = ?
ORDER BY f.id DESC
LIMIT 50""",
(user_id,)
)
favorites = []
for row in favorite_rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
favorites.append(await format_rant(rant_data, user_data, current_user_id))
return {
"success": True,
"profile": {
"username": user['username'],
"score": user['score'],
"about": user['about'],
"location": user['location'],
"created_time": user['created_time'],
"skills": user['skills'],
"github": user['github'],
"website": user['website'],
"avatar": {
"b": user['avatar_b'],
"i": user['avatar_i'] or ""
},
"content": {
"content": {
"rants": rants,
"comments": comments,
"favorites": favorites
}
}
}
}
@app.get("/api/get-user-id")
async def get_user_id(
username: str,
app: int = 3
):
user = await db.get("users", {"username": username})
if not user:
return {"success": False, "error": "User not found"}
return {"success": True, "user_id": user['id']}
@app.get("/api/rant/search")
async def search(
term: str,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None
# Search rants
rows = await db.query_raw(
"""SELECT r.*, u.id as user_id, u.username, u.score as user_score,
u.avatar_b, u.avatar_i
FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.text LIKE ? OR r.tags LIKE ?
ORDER BY r.score DESC
LIMIT 50""",
(f'%{term}%', f'%{term}%')
)
results = []
for row in rows:
rant_data = {
'id': row['id'],
'text': row['text'],
'score': row['score'],
'created_time': row['created_time'],
'attached_image': row['attached_image'],
'tags': row['tags'],
'edited': row['edited'],
'type': row['type']
}
user_data = {
'id': row['user_id'],
'username': row['username'],
'score': row['user_score'],
'avatar_b': row['avatar_b'],
'avatar_i': row['avatar_i']
}
results.append(await format_rant(rant_data, user_data, current_user_id))
return {"success": True, "results": results}
@app.get("/api/users/me/notif-feed")
async def get_notifications(
ext_prof: int = 1,
last_time: Optional[int] = None,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
# Use the generic authenticate_user function
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# Get notifications
rows = await db.query_raw(
"""SELECT n.*, u.username
FROM notifications n
LEFT JOIN users u ON n.from_user_id = u.id
WHERE n.user_id = ?
ORDER BY n.created_time DESC
LIMIT 50""",
(current_user_id,)
)
items = []
unread_count = 0
for row in rows:
item = {
"type": row['type'],
"rant_id": row['rant_id'],
"comment_id": row['comment_id'],
"created_time": row['created_time'],
"read": row['read'],
"uid": row['from_user_id'],
"username": row['username'] or ""
}
items.append(item)
if not row['read']:
unread_count += 1
# Mark notifications as read
if rows: # Only update if there are notifications
await db.update("notifications", {"read": 1}, {"user_id": current_user_id})
return {
"success": True,
"data": {
"items": items,
"check_time": int(datetime.now().timestamp()),
"username_map": [],
"unread": {
"all": unread_count,
"upvotes": 0,
"mentions": 0,
"comments": unread_count,
"subs": 0,
"total": unread_count
},
"num_unread": unread_count
}
}
@app.delete("/api/users/me/notif-feed")
async def clear_notifications(
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
await db.delete("notifications", {"user_id": current_user_id})
return {"success": True}
@app.post("/api/users/me/edit-profile")
async def edit_profile(
profile_about: str = Form(""),
profile_skills: str = Form(""),
profile_location: str = Form(""),
profile_website: str = Form(""),
profile_github: str = Form(""),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
await db.update("users", {
"about": profile_about,
"skills": profile_skills,
"location": profile_location,
"website": profile_website,
"github": profile_github
}, {"id": current_user_id})
return {"success": True}
@app.post("/api/users/forgot-password")
async def forgot_password(
username: str = Form(...),
app: int = Form(3)
):
# In a real implementation, this would send an email
return {"success": True}
@app.post("/api/users/me/resend-confirm")
async def resend_confirmation(
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# In a real implementation, this would send an email
return {"success": True}
@app.post("/api/users/me/mark-news-read")
async def mark_news_read(
news_id: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = await authenticate_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# In a real implementation, this would mark news as read
return {"success": True}
# Serve uploaded files
@app.get("/uploads/{filename}")
async def get_upload(filename: str):
file_path = UPLOAD_DIR / filename
if file_path.exists():
return FileResponse(file_path)
raise HTTPException(status_code=404, detail="File not found")
# Create static directory if it doesn't exist
Path("static").mkdir(exist_ok=True)
# Serve static files (for frontend)
app.mount("/static", StaticFiles(directory="static"), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8111)