from fastapi import FastAPI, HTTPException, Depends, Form, File, UploadFile, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, EmailStr from typing import Optional, List, Dict, Any, Literal from datetime import datetime, timedelta import hashlib import secrets import json import os from pathlib import Path import asyncio from ads import AsyncDataSet app = FastAPI(title="Rant Community API") # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Database setup DB_PATH = "rant_community.db" UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) # Template setup templates = Jinja2Templates(directory="templates") # Initialize AsyncDataSet db = AsyncDataSet(DB_PATH) async def init_db(): # Users table await db.query_raw(""" CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, score INTEGER DEFAULT 0, about TEXT DEFAULT '', location TEXT DEFAULT '', skills TEXT DEFAULT '', github TEXT DEFAULT '', website TEXT DEFAULT '', created_time INTEGER NOT NULL, avatar_b TEXT DEFAULT '7bc8a4', avatar_i TEXT, updated_at TEXT, uid TEXT, created_at TEXT, deleted_at TEXT ); """) await db.query_raw(""" CREATE TABLE auth_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, token_key TEXT UNIQUE NOT NULL, expire_time INTEGER NOT NULL, uid TEXT, created_at TEXT, updated_at TEXT, deleted_at TEXT, FOREIGN KEY (user_id) REFERENCES users (id) ); """) await db.query_raw(""" CREATE TABLE rants ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, text TEXT NOT NULL, score INTEGER DEFAULT 0, created_time INTEGER NOT NULL, attached_image TEXT DEFAULT '', tags TEXT DEFAULT '', edited BOOLEAN DEFAULT 0, type INTEGER DEFAULT 1, updated_at TEXT, uid TEXT, created_at TEXT, deleted_at TEXT, FOREIGN KEY (user_id) REFERENCES users (id) ); """) await db.query_raw(""" CREATE TABLE comments ( id INTEGER PRIMARY KEY AUTOINCREMENT, rant_id INTEGER NOT NULL, user_id INTEGER NOT NULL, body TEXT NOT NULL, score INTEGER DEFAULT 0, created_time INTEGER NOT NULL, attached_image TEXT DEFAULT '', edited BOOLEAN DEFAULT 0, uid TEXT, created_at TEXT, updated_at TEXT, deleted_at TEXT, FOREIGN KEY (rant_id) REFERENCES rants (id), FOREIGN KEY (user_id) REFERENCES users (id) ); """) await db.query_raw(""" CREATE TABLE votes ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, target_id INTEGER NOT NULL, target_type TEXT NOT NULL, vote INTEGER NOT NULL, reason INTEGER, uid TEXT, created_at TEXT, updated_at TEXT, deleted_at TEXT, UNIQUE(user_id, target_id, target_type), FOREIGN KEY (user_id) REFERENCES users (id) ); """) await db.query_raw(""" CREATE TABLE favorites ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, rant_id INTEGER NOT NULL, uid TEXT, created_at TEXT, updated_at TEXT, deleted_at TEXT, UNIQUE(user_id, rant_id), FOREIGN KEY (user_id) REFERENCES users (id), FOREIGN KEY (rant_id) REFERENCES rants (id) ); """) await db.query_raw(""" CREATE TABLE notifications ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, type TEXT NOT NULL, rant_id INTEGER, comment_id INTEGER, from_user_id INTEGER, created_time INTEGER NOT NULL, read BOOLEAN DEFAULT 0, uid TEXT, created_at TEXT, updated_at TEXT, deleted_at TEXT, FOREIGN KEY (user_id) REFERENCES users (id) ); """) # Run init_db on startup @app.on_event("startup") async def startup_event(): await init_db() # Create necessary directories Path("templates").mkdir(exist_ok=True) Path("templates/components").mkdir(exist_ok=True) Path("static").mkdir(exist_ok=True) # Pydantic models class UserRegister(BaseModel): email: EmailStr username: str password: str class UserLogin(BaseModel): username: str password: str class RantCreate(BaseModel): rant: str tags: str type: int = 1 class CommentCreate(BaseModel): comment: str class VoteRequest(BaseModel): vote: Literal[-1, 0, 1] reason: Optional[int] = None # Helper functions def hash_password(password: str) -> str: return hashlib.sha256(password.encode()).hexdigest() def generate_token() -> str: return secrets.token_urlsafe(32) def format_time(timestamp): date = datetime.fromtimestamp(timestamp) now = datetime.now() diff = (now - date).total_seconds() if diff < 60: return 'just now' if diff < 3600: return f'{int(diff / 60)}m ago' if diff < 86400: return f'{int(diff / 3600)}h ago' if diff < 604800: return f'{int(diff / 86400)}d ago' return date.strftime('%Y-%m-%d') templates.env.globals['format_time'] = format_time # Add template context processors def escape_html(text): return text.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') templates.env.globals['escape_html'] = escape_html # Authentication helpers async def get_current_user_from_cookie(request: Request): """Get current user from session cookie""" auth_token = request.cookies.get("auth_token") if not auth_token: return None try: token_data = json.loads(auth_token) token = await db.get("auth_tokens", { "id": token_data['id'], "token_key": token_data['key'], "user_id": token_data['user_id'] }) if not token or token['expire_time'] <= int(datetime.now().timestamp()): return None user = await db.get("users", {"id": token_data['user_id']}) if user: # Add token info to user object for client-side API calls user['token_id'] = token_data['id'] user['token_key'] = token_data['key'] return user except: return None async def authenticate_user(token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None): """Generic authentication function that works with any parameter source""" if not all([token_id, token_key, user_id]): return None token = await db.get("auth_tokens", { "id": token_id, "token_key": token_key, "user_id": user_id }) if not token or token['expire_time'] <= int(datetime.now().timestamp()): return None return user_id async def format_rant(rant_row, user_row, current_user_id=None): # Get comment count comment_count = await db.count("comments", {"rant_id": rant_row['id']}) # Get vote state for current user vote_state = 0 if current_user_id: vote = await db.get("votes", { "user_id": current_user_id, "target_id": rant_row['id'], "target_type": "rant" }) if vote: vote_state = vote['vote'] tags = json.loads(rant_row['tags']) if rant_row['tags'] else [] return { "id": rant_row['id'], "text": rant_row['text'], "score": rant_row['score'], "created_time": rant_row['created_time'], "attached_image": rant_row['attached_image'], "num_comments": comment_count, "tags": tags, "vote_state": vote_state, "edited": bool(rant_row['edited']), "rt": rant_row['type'], "rc": 1, "user_id": user_row['id'], "user_username": user_row['username'], "user_score": user_row['score'], "user_avatar": { "b": user_row['avatar_b'], "i": user_row['avatar_i'] or "" }, "user_avatar_lg": { "b": user_row['avatar_b'], "i": user_row['avatar_i'] or "" } } async def format_comment(comment_row, user_row, current_user_id=None): # Get vote state for current user vote_state = 0 if current_user_id: vote = await db.get("votes", { "user_id": current_user_id, "target_id": comment_row['id'], "target_type": "comment" }) if vote: vote_state = vote['vote'] return { "id": comment_row['id'], "rant_id": comment_row['rant_id'], "body": comment_row['body'], "score": comment_row['score'], "created_time": comment_row['created_time'], "vote_state": vote_state, "user_id": user_row['id'], "user_username": user_row['username'], "user_score": user_row['score'], "user_avatar": { "b": user_row['avatar_b'], "i": user_row['avatar_i'] or "" } } # SSR Routes @app.get("/", response_class=HTMLResponse) async def home(request: Request, sort: str = "recent"): current_user = await get_current_user_from_cookie(request) current_user_id = current_user['id'] if current_user else None # Get rants order_by = "r.created_time DESC" if sort == "recent" else "r.score DESC" rows = await db.query_raw( f"""SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id ORDER BY {order_by} LIMIT 50 OFFSET 0""", () ) rants = [] for row in rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } rants.append(await format_rant(rant_data, user_data, current_user_id)) # Get notification count notif_count = 0 if current_user: notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0}) return templates.TemplateResponse("feed.html", { "request": request, "current_user": current_user, "notif_count": notif_count, "rants": rants, "current_sort": sort, "escape_html": escape_html, "format_time": format_time }) @app.get("/rant/{rant_id}", response_class=HTMLResponse) async def rant_detail(request: Request, rant_id: int): current_user = await get_current_user_from_cookie(request) current_user_id = current_user['id'] if current_user else None # Get rant with user info rant_row = await db.query_one( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.id = ?""", (rant_id,) ) if not rant_row: raise HTTPException(status_code=404, detail="Rant not found") rant_data = { 'id': rant_row['id'], 'text': rant_row['text'], 'score': rant_row['score'], 'created_time': rant_row['created_time'], 'attached_image': rant_row['attached_image'], 'tags': rant_row['tags'], 'edited': rant_row['edited'], 'type': rant_row['type'] } user_data = { 'id': rant_row['user_id'], 'username': rant_row['username'], 'score': rant_row['user_score'], 'avatar_b': rant_row['avatar_b'], 'avatar_i': rant_row['avatar_i'] } rant = await format_rant(rant_data, user_data, current_user_id) # Get comments comment_rows = await db.query_raw( """SELECT c.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM comments c JOIN users u ON c.user_id = u.id WHERE c.rant_id = ? ORDER BY c.created_time ASC""", (rant_id,) ) comments = [] for row in comment_rows: comment_data = { 'id': row['id'], 'rant_id': row['rant_id'], 'body': row['body'], 'score': row['score'], 'created_time': row['created_time'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } comments.append(await format_comment(comment_data, user_data, current_user_id)) # Check if subscribed (favorited) subscribed = 0 if current_user_id: if await db.exists("favorites", {"user_id": current_user_id, "rant_id": rant_id}): subscribed = 1 # Get notification count notif_count = 0 if current_user: notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0}) return templates.TemplateResponse("rant_detail.html", { "request": request, "current_user": current_user, "notif_count": notif_count, "rant": rant, "comments": comments, "subscribed": subscribed, "escape_html": escape_html, "format_time": format_time }) @app.get("/profile/{user_id}", response_class=HTMLResponse) async def profile(request: Request, user_id: int, tab: str = "rants"): current_user = await get_current_user_from_cookie(request) current_user_id = current_user['id'] if current_user else None # Get user user = await db.get("users", {"id": user_id}) if not user: raise HTTPException(status_code=404, detail="User not found") # Get user's rants rant_rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.user_id = ? ORDER BY r.created_time DESC LIMIT 50""", (user_id,) ) rants = [] for row in rant_rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } rants.append(await format_rant(rant_data, user_data, current_user_id)) # Get user's comments comment_rows = await db.query_raw( """SELECT c.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM comments c JOIN users u ON c.user_id = u.id WHERE c.user_id = ? ORDER BY c.created_time DESC LIMIT 50""", (user_id,) ) comments = [] for row in comment_rows: comment_data = { 'id': row['id'], 'rant_id': row['rant_id'], 'body': row['body'], 'score': row['score'], 'created_time': row['created_time'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } comments.append(await format_comment(comment_data, user_data, current_user_id)) # Get favorited rants favorite_rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id JOIN favorites f ON f.rant_id = r.id WHERE f.user_id = ? ORDER BY f.id DESC LIMIT 50""", (user_id,) ) favorites = [] for row in favorite_rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } favorites.append(await format_rant(rant_data, user_data, current_user_id)) # Get notification count notif_count = 0 if current_user: notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0}) profile_data = { "username": user['username'], "score": user['score'], "about": user['about'], "location": user['location'], "created_time": user['created_time'], "skills": user['skills'], "github": user['github'], "website": user['website'], "avatar": { "b": user['avatar_b'], "i": user['avatar_i'] or "" } } return templates.TemplateResponse("profile.html", { "request": request, "current_user": current_user, "notif_count": notif_count, "profile": profile_data, "profile_user_id": user_id, "rants": rants, "comments": comments, "favorites": favorites, "active_tab": tab, "escape_html": escape_html, "format_time": format_time }) @app.get("/search", response_class=HTMLResponse) async def search_page(request: Request, term: str = None): current_user = await get_current_user_from_cookie(request) current_user_id = current_user['id'] if current_user else None results = [] if term: # Search rants rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.text LIKE ? OR r.tags LIKE ? ORDER BY r.score DESC LIMIT 50""", (f'%{term}%', f'%{term}%') ) for row in rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } results.append(await format_rant(rant_data, user_data, current_user_id)) # Get notification count notif_count = 0 if current_user: notif_count = await db.count("notifications", {"user_id": current_user['id'], "read": 0}) return templates.TemplateResponse("search.html", { "request": request, "current_user": current_user, "notif_count": notif_count, "search_term": term, "results": results, "escape_html": escape_html, "format_time": format_time }) @app.get("/notifications", response_class=HTMLResponse) async def notifications(request: Request): current_user = await get_current_user_from_cookie(request) if not current_user: return RedirectResponse("/", status_code=302) # Get notifications rows = await db.query_raw( """SELECT n.*, u.username FROM notifications n LEFT JOIN users u ON n.from_user_id = u.id WHERE n.user_id = ? ORDER BY n.created_time DESC LIMIT 50""", (current_user['id'],) ) items = [] for row in rows: item = { "type": row['type'], "rant_id": row['rant_id'], "comment_id": row['comment_id'], "created_time": row['created_time'], "read": row['read'], "uid": row['from_user_id'], "username": row['username'] or "" } items.append(item) # Mark notifications as read if rows: await db.update("notifications", {"read": 1}, {"user_id": current_user['id']}) return templates.TemplateResponse("notifications.html", { "request": request, "current_user": current_user, "notif_count": 0, # We just marked them as read "items": items, "format_time": format_time }) @app.get("/classic", response_class=HTMLResponse) async def classic(): return FileResponse("classic.html") # Authentication endpoints with cookie support @app.post("/login") async def login_form( request: Request, response: Response, username: str = Form(...), password: str = Form(...) ): # Find user by username or email user = await db.query_one( "SELECT * FROM users WHERE username = ? OR email = ?", (username, username) ) if not user or user['password_hash'] != hash_password(password): return templates.TemplateResponse("login.html", { "request": request, "error": "Invalid login credentials entered. Please try again." }) # Create auth token token_key = generate_token() expire_time = int((datetime.now() + timedelta(days=30)).timestamp()) token_id = await db.insert("auth_tokens", { "user_id": user['id'], "token_key": token_key, "expire_time": expire_time }, return_id=True) # Set cookie auth_token = { "id": token_id, "key": token_key, "expire_time": expire_time, "user_id": user['id'] } response = RedirectResponse("/", status_code=302) response.set_cookie("auth_token", json.dumps(auth_token), max_age=30*24*60*60) return response @app.get("/logout") async def logout(response: Response): response = RedirectResponse("/", status_code=302) response.delete_cookie("auth_token") return response # REST API Endpoints (keeping all existing endpoints) @app.post("/api/users") async def register_user( email: str = Form(...), username: str = Form(...), password: str = Form(...), type: int = Form(1), app: int = Form(3) ): # Validate username length if len(username) < 4 or len(username) > 15: return { "success": False, "error": "Your username must be between 4 and 15 characters.", "error_field": "username" } # Check if username exists if await db.exists("users", {"username": username}): return { "success": False, "error": "Username already taken.", "error_field": "username" } # Check if email exists if await db.exists("users", {"email": email}): return { "success": False, "error": "Email already registered.", "error_field": "email" } # Create user password_hash = hash_password(password) created_time = int(datetime.now().timestamp()) try: await db.insert("users", { "username": username, "email": email, "password_hash": password_hash, "created_time": created_time }, return_id=True) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} @app.post("/api/users/auth-token") async def login( username: str = Form(...), password: str = Form(...), app: int = Form(3) ): # Find user by username or email user = await db.query_one( "SELECT * FROM users WHERE username = ? OR email = ?", (username, username) ) if not user or user['password_hash'] != hash_password(password): return { "success": False, "error": "Invalid login credentials entered. Please try again." } # Create auth token token_key = generate_token() expire_time = int((datetime.now() + timedelta(days=30)).timestamp()) token_id = await db.insert("auth_tokens", { "user_id": user['id'], "token_key": token_key, "expire_time": expire_time }, return_id=True) return { "success": True, "auth_token": { "id": token_id, "key": token_key, "expire_time": expire_time, "user_id": user['id'] } } @app.get("/api/rant/rants") async def get_rants( sort: str = "recent", limit: int = 20, skip: int = 0, app: int = 3, token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None # Get rants with user info order_by = "r.created_time DESC" if sort == "recent" else "r.score DESC" rows = await db.query_raw( f"""SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id ORDER BY {order_by} LIMIT ? OFFSET ?""", (limit, skip) ) rants = [] for row in rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } rants.append(await format_rant(rant_data, user_data, current_user_id)) return { "success": True, "rants": rants, "settings": {"notif_state": -1, "notif_token": ""}, "set": secrets.token_hex(7), "wrw": 385, "dpp": 0, "num_notifs": 0, "unread": {"total": 0}, "news": { "id": 356, "type": "intlink", "headline": "Weekly Group Rant", "body": "Share your thoughts!", "footer": "Add tag 'wk247' to your rant", "height": 100, "action": "grouprant" } } @app.get("/api/rant/rants/{rant_id}") async def get_rant( rant_id: int, app: int = 3, last_comment_id: Optional[int] = None, token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None # Get rant with user info rant_row = await db.query_one( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.id = ?""", (rant_id,) ) if not rant_row: raise HTTPException(status_code=404, detail="Rant not found") rant_data = { 'id': rant_row['id'], 'text': rant_row['text'], 'score': rant_row['score'], 'created_time': rant_row['created_time'], 'attached_image': rant_row['attached_image'], 'tags': rant_row['tags'], 'edited': rant_row['edited'], 'type': rant_row['type'] } user_data = { 'id': rant_row['user_id'], 'username': rant_row['username'], 'score': rant_row['user_score'], 'avatar_b': rant_row['avatar_b'], 'avatar_i': rant_row['avatar_i'] } rant = await format_rant(rant_data, user_data, current_user_id) # Get comments comment_rows = await db.query_raw( """SELECT c.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM comments c JOIN users u ON c.user_id = u.id WHERE c.rant_id = ? ORDER BY c.created_time ASC""", (rant_id,) ) comments = [] for row in comment_rows: comment_data = { 'id': row['id'], 'rant_id': row['rant_id'], 'body': row['body'], 'score': row['score'], 'created_time': row['created_time'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } comments.append(await format_comment(comment_data, user_data, current_user_id)) # Check if subscribed (favorited) subscribed = 0 if current_user_id: if await db.exists("favorites", {"user_id": current_user_id, "rant_id": rant_id}): subscribed = 1 # Add link to rant rant['link'] = f"rants/{rant_id}/{rant['text'][:50].replace(' ', '-')}" return { "rant": rant, "comments": comments, "success": True, "subscribed": subscribed } @app.post("/api/rant/rants") async def create_rant( rant: str = Form(...), tags: str = Form(...), type: int = Form(1), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...), image: Optional[UploadFile] = File(None) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Check for duplicate rant recent_time = int(datetime.now().timestamp()) - 300 duplicate = await db.query_one( "SELECT id FROM rants WHERE user_id = ? AND text = ? AND created_time > ?", (current_user_id, rant, recent_time) ) if duplicate: return { "success": False, "error": "It looks like you just posted this same rant! Your connection might have timed out while posting so you might have seen an error, but sometimes the rant still gets posted and in this case it seems it did, so please check :) If this was not the case please contact info@rant.io. Thanks!" } # Handle image upload image_path = "" if image: file_ext = os.path.splitext(image.filename)[1] file_name = f"{secrets.token_hex(16)}{file_ext}" file_path = UPLOAD_DIR / file_name with open(file_path, "wb") as f: f.write(await image.read()) image_path = f"/uploads/{file_name}" # Create rant created_time = int(datetime.now().timestamp()) tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()] rant_id = await db.insert("rants", { "user_id": current_user_id, "text": rant, "created_time": created_time, "attached_image": image_path, "tags": json.dumps(tags_list), "type": type }, return_id=True) return {"success": True, "rant_id": rant_id} @app.post("/api/rant/rants/{rant_id}") async def update_rant( rant_id: int, rant: str = Form(...), tags: str = Form(...), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Check ownership rant_row = await db.get("rants", {"id": rant_id}) if not rant_row or rant_row['user_id'] != current_user_id: return {"success": False, "fail_reason": "Unauthorized"} # Update rant tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()] await db.update("rants", { "text": rant, "tags": json.dumps(tags_list), "edited": 1 }, {"id": rant_id}) return {"success": True} @app.delete("/api/rant/rants/{rant_id}") async def delete_rant( rant_id: int, app: int = 3, token_id: int = None, token_key: str = None, user_id: int = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Check ownership rant_row = await db.get("rants", {"id": rant_id}) if not rant_row: return {"success": False, "error": "Rant not found"} if rant_row['user_id'] != current_user_id: return {"success": False, "error": "Unauthorized"} # Delete rant and related data await db.delete("comments", {"rant_id": rant_id}) await db.delete("votes", {"target_id": rant_id, "target_type": "rant"}) await db.delete("favorites", {"rant_id": rant_id}) await db.delete("rants", {"id": rant_id}) return {"success": True} @app.post("/api/rant/rants/{rant_id}/vote") async def vote_rant( rant_id: int, vote: int = Form(...), reason: Optional[int] = Form(None), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Get rant rant = await db.get("rants", {"id": rant_id}) if not rant: return {"success": False, "error": "Rant not found"} # Check for existing vote existing_vote = await db.get("votes", { "user_id": current_user_id, "target_id": rant_id, "target_type": "rant" }) if vote == 0: # Remove vote if existing_vote: await db.delete("votes", {"id": existing_vote['id']}) # Update score await db.update("rants", { "score": rant['score'] - existing_vote['vote'] }, {"id": rant_id}) else: if existing_vote: # Update vote score_diff = vote - existing_vote['vote'] await db.update("votes", { "vote": vote, "reason": reason }, {"id": existing_vote['id']}) await db.update("rants", { "score": rant['score'] + score_diff }, {"id": rant_id}) else: # New vote await db.insert("votes", { "user_id": current_user_id, "target_id": rant_id, "target_type": "rant", "vote": vote, "reason": reason }, return_id=True) await db.update("rants", { "score": rant['score'] + vote }, {"id": rant_id}) # Update user score user = await db.get("users", {"id": rant['user_id']}) score_change = vote if vote != 0 else -existing_vote['vote'] if existing_vote else 0 await db.update("users", { "score": user['score'] + score_change }, {"id": rant['user_id']}) # Get updated rant with user info updated_rant = await db.query_one( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.id = ?""", (rant_id,) ) rant_data = { 'id': updated_rant['id'], 'text': updated_rant['text'], 'score': updated_rant['score'], 'created_time': updated_rant['created_time'], 'attached_image': updated_rant['attached_image'], 'tags': updated_rant['tags'], 'edited': updated_rant['edited'], 'type': updated_rant['type'] } user_data = { 'id': updated_rant['user_id'], 'username': updated_rant['username'], 'score': updated_rant['user_score'], 'avatar_b': updated_rant['avatar_b'], 'avatar_i': updated_rant['avatar_i'] } return { "success": True, "rant": await format_rant(rant_data, user_data, current_user_id) } @app.post("/api/rant/rants/{rant_id}/favorite") async def favorite_rant( rant_id: int, app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} try: await db.insert("favorites", { "user_id": current_user_id, "rant_id": rant_id }, return_id=True) return {"success": True} except Exception: return {"success": False, "error": "Already favorited"} @app.post("/api/rant/rants/{rant_id}/unfavorite") async def unfavorite_rant( rant_id: int, app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} await db.delete("favorites", { "user_id": current_user_id, "rant_id": rant_id }) return {"success": True} @app.post("/api/rant/rants/{rant_id}/comments") async def create_comment( rant_id: int, comment: str = Form(...), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...), image: Optional[UploadFile] = File(None) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "confirmed": False} # Check if rant exists if not await db.exists("rants", {"id": rant_id}): return {"success": False, "error": "Rant not found"} # Handle image upload image_path = "" if image: file_ext = os.path.splitext(image.filename)[1] file_name = f"{secrets.token_hex(16)}{file_ext}" file_path = UPLOAD_DIR / file_name with open(file_path, "wb") as f: f.write(await image.read()) image_path = f"/uploads/{file_name}" # Create comment created_time = int(datetime.now().timestamp()) comment_id = await db.insert("comments", { "rant_id": rant_id, "user_id": current_user_id, "body": comment, "created_time": created_time, "attached_image": image_path }, return_id=True) # Create notification for rant owner rant = await db.get("rants", {"id": rant_id}) if rant and rant['user_id'] != current_user_id: await db.insert("notifications", { "user_id": rant['user_id'], "type": "comment", "rant_id": rant_id, "comment_id": comment_id, "from_user_id": current_user_id, "created_time": created_time }, return_id=True) return {"success": True} @app.get("/api/comments/{comment_id}") async def get_comment( comment_id: int, app: int = 3, token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None row = await db.query_one( """SELECT c.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM comments c JOIN users u ON c.user_id = u.id WHERE c.id = ?""", (comment_id,) ) if not row: return {"success": False, "error": "Invalid comment specified in path."} comment_data = { 'id': row['id'], 'rant_id': row['rant_id'], 'body': row['body'], 'score': row['score'], 'created_time': row['created_time'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } return { "success": True, "comment": await format_comment(comment_data, user_data, current_user_id) } @app.post("/api/comments/{comment_id}") async def update_comment( comment_id: int, comment: str = Form(...), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Check ownership comment_row = await db.get("comments", {"id": comment_id}) if not comment_row: return {"success": False, "error": "Invalid comment specified in path."} if comment_row['user_id'] != current_user_id: return {"success": False, "fail_reason": "Unauthorized"} # Update comment await db.update("comments", { "body": comment, "edited": 1 }, {"id": comment_id}) return {"success": True} @app.delete("/api/comments/{comment_id}") async def delete_comment( comment_id: int, app: int = 3, token_id: int = None, token_key: str = None, user_id: int = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Check ownership comment_row = await db.get("comments", {"id": comment_id}) if not comment_row: return {"success": False, "error": "Invalid comment specified in path."} if comment_row['user_id'] != current_user_id: return {"success": False, "error": "Unauthorized"} # Delete comment and related data await db.delete("votes", {"target_id": comment_id, "target_type": "comment"}) await db.delete("comments", {"id": comment_id}) return {"success": True} @app.post("/api/comments/{comment_id}/vote") async def vote_comment( comment_id: int, vote: int = Form(...), reason: Optional[int] = Form(None), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Get comment comment = await db.get("comments", {"id": comment_id}) if not comment: return {"success": False, "error": "Invalid comment specified in path."} # Check for existing vote existing_vote = await db.get("votes", { "user_id": current_user_id, "target_id": comment_id, "target_type": "comment" }) if vote == 0: # Remove vote if existing_vote: await db.delete("votes", {"id": existing_vote['id']}) # Update score await db.update("comments", { "score": comment['score'] - existing_vote['vote'] }, {"id": comment_id}) else: if existing_vote: # Update vote score_diff = vote - existing_vote['vote'] await db.update("votes", { "vote": vote, "reason": reason }, {"id": existing_vote['id']}) await db.update("comments", { "score": comment['score'] + score_diff }, {"id": comment_id}) else: # New vote await db.insert("votes", { "user_id": current_user_id, "target_id": comment_id, "target_type": "comment", "vote": vote, "reason": reason }, return_id=True) await db.update("comments", { "score": comment['score'] + vote }, {"id": comment_id}) return {"success": True} @app.get("/api/users/{user_id}") async def get_profile( user_id: int, app: int = 3, token_id: Optional[int] = None, token_key: Optional[str] = None, auth_user_id: Optional[int] = None ): current_user_id = await authenticate_user(token_id, token_key, auth_user_id) if token_id else None # Get user user = await db.get("users", {"id": user_id}) if not user: return {"success": False, "error": "User not found"} # Get user's rants rant_rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.user_id = ? ORDER BY r.created_time DESC LIMIT 50""", (user_id,) ) rants = [] for row in rant_rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } rants.append(await format_rant(rant_data, user_data, current_user_id)) # Get user's comments comment_rows = await db.query_raw( """SELECT c.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM comments c JOIN users u ON c.user_id = u.id WHERE c.user_id = ? ORDER BY c.created_time DESC LIMIT 50""", (user_id,) ) comments = [] for row in comment_rows: comment_data = { 'id': row['id'], 'rant_id': row['rant_id'], 'body': row['body'], 'score': row['score'], 'created_time': row['created_time'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } comments.append(await format_comment(comment_data, user_data, current_user_id)) # Get favorited rants favorite_rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id JOIN favorites f ON f.rant_id = r.id WHERE f.user_id = ? ORDER BY f.id DESC LIMIT 50""", (user_id,) ) favorites = [] for row in favorite_rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } favorites.append(await format_rant(rant_data, user_data, current_user_id)) return { "success": True, "profile": { "username": user['username'], "score": user['score'], "about": user['about'], "location": user['location'], "created_time": user['created_time'], "skills": user['skills'], "github": user['github'], "website": user['website'], "avatar": { "b": user['avatar_b'], "i": user['avatar_i'] or "" }, "content": { "content": { "rants": rants, "comments": comments, "favorites": favorites } } } } @app.get("/api/get-user-id") async def get_user_id( username: str, app: int = 3 ): user = await db.get("users", {"username": username}) if not user: return {"success": False, "error": "User not found"} return {"success": True, "user_id": user['id']} @app.get("/api/rant/search") async def search( term: str, app: int = 3, token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None ): current_user_id = await authenticate_user(token_id, token_key, user_id) if token_id else None # Search rants rows = await db.query_raw( """SELECT r.*, u.id as user_id, u.username, u.score as user_score, u.avatar_b, u.avatar_i FROM rants r JOIN users u ON r.user_id = u.id WHERE r.text LIKE ? OR r.tags LIKE ? ORDER BY r.score DESC LIMIT 50""", (f'%{term}%', f'%{term}%') ) results = [] for row in rows: rant_data = { 'id': row['id'], 'text': row['text'], 'score': row['score'], 'created_time': row['created_time'], 'attached_image': row['attached_image'], 'tags': row['tags'], 'edited': row['edited'], 'type': row['type'] } user_data = { 'id': row['user_id'], 'username': row['username'], 'score': row['user_score'], 'avatar_b': row['avatar_b'], 'avatar_i': row['avatar_i'] } results.append(await format_rant(rant_data, user_data, current_user_id)) return {"success": True, "results": results} @app.get("/api/users/me/notif-feed") async def get_notifications( ext_prof: int = 1, last_time: Optional[int] = None, app: int = 3, token_id: Optional[int] = None, token_key: Optional[str] = None, user_id: Optional[int] = None ): # Use the generic authenticate_user function current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # Get notifications rows = await db.query_raw( """SELECT n.*, u.username FROM notifications n LEFT JOIN users u ON n.from_user_id = u.id WHERE n.user_id = ? ORDER BY n.created_time DESC LIMIT 50""", (current_user_id,) ) items = [] unread_count = 0 for row in rows: item = { "type": row['type'], "rant_id": row['rant_id'], "comment_id": row['comment_id'], "created_time": row['created_time'], "read": row['read'], "uid": row['from_user_id'], "username": row['username'] or "" } items.append(item) if not row['read']: unread_count += 1 # Mark notifications as read if rows: # Only update if there are notifications await db.update("notifications", {"read": 1}, {"user_id": current_user_id}) return { "success": True, "data": { "items": items, "check_time": int(datetime.now().timestamp()), "username_map": [], "unread": { "all": unread_count, "upvotes": 0, "mentions": 0, "comments": unread_count, "subs": 0, "total": unread_count }, "num_unread": unread_count } } @app.delete("/api/users/me/notif-feed") async def clear_notifications( app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} await db.delete("notifications", {"user_id": current_user_id}) return {"success": True} @app.post("/api/users/me/edit-profile") async def edit_profile( profile_about: str = Form(""), profile_skills: str = Form(""), profile_location: str = Form(""), profile_website: str = Form(""), profile_github: str = Form(""), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} await db.update("users", { "about": profile_about, "skills": profile_skills, "location": profile_location, "website": profile_website, "github": profile_github }, {"id": current_user_id}) return {"success": True} @app.post("/api/users/forgot-password") async def forgot_password( username: str = Form(...), app: int = Form(3) ): # In a real implementation, this would send an email return {"success": True} @app.post("/api/users/me/resend-confirm") async def resend_confirmation( app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # In a real implementation, this would send an email return {"success": True} @app.post("/api/users/me/mark-news-read") async def mark_news_read( news_id: str = Form(...), app: int = Form(3), token_id: int = Form(...), token_key: str = Form(...), user_id: int = Form(...) ): current_user_id = await authenticate_user(token_id, token_key, user_id) if not current_user_id: return {"success": False, "error": "Authentication required"} # In a real implementation, this would mark news as read return {"success": True} # Serve uploaded files @app.get("/uploads/{filename}") async def get_upload(filename: str): file_path = UPLOAD_DIR / filename if file_path.exists(): return FileResponse(file_path) raise HTTPException(status_code=404, detail="File not found") # Create static directory if it doesn't exist Path("static").mkdir(exist_ok=True) # Serve static files (for frontend) app.mount("/static", StaticFiles(directory="static"), name="static") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8111)