|
# backend.py
|
|
from fastapi import FastAPI, HTTPException, Depends, Form, File, UploadFile
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import FileResponse
|
|
from pydantic import BaseModel, EmailStr
|
|
from typing import Optional, List, Dict, Any, Literal
|
|
from datetime import datetime, timedelta
|
|
import hashlib
|
|
import secrets
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
|
|
app = FastAPI(title="DevRant Community API")
|
|
|
|
# Enable CORS
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Database setup
|
|
DB_PATH = "devrant_community.db"
|
|
UPLOAD_DIR = Path("uploads")
|
|
UPLOAD_DIR.mkdir(exist_ok=True)
|
|
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
c = conn.cursor()
|
|
|
|
# Users table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
email TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
score INTEGER DEFAULT 0,
|
|
about TEXT DEFAULT '',
|
|
location TEXT DEFAULT '',
|
|
skills TEXT DEFAULT '',
|
|
github TEXT DEFAULT '',
|
|
website TEXT DEFAULT '',
|
|
created_time INTEGER NOT NULL,
|
|
avatar_b TEXT DEFAULT '7bc8a4',
|
|
avatar_i TEXT
|
|
)''')
|
|
|
|
# Auth tokens table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS auth_tokens (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
token_key TEXT UNIQUE NOT NULL,
|
|
expire_time INTEGER NOT NULL,
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)''')
|
|
|
|
# Rants table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS rants (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
text TEXT NOT NULL,
|
|
score INTEGER DEFAULT 0,
|
|
created_time INTEGER NOT NULL,
|
|
attached_image TEXT DEFAULT '',
|
|
tags TEXT DEFAULT '',
|
|
edited BOOLEAN DEFAULT 0,
|
|
type INTEGER DEFAULT 1,
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)''')
|
|
|
|
# Comments table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS comments (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
rant_id INTEGER NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
body TEXT NOT NULL,
|
|
score INTEGER DEFAULT 0,
|
|
created_time INTEGER NOT NULL,
|
|
attached_image TEXT DEFAULT '',
|
|
edited BOOLEAN DEFAULT 0,
|
|
FOREIGN KEY (rant_id) REFERENCES rants (id),
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)''')
|
|
|
|
# Votes table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS votes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
target_id INTEGER NOT NULL,
|
|
target_type TEXT NOT NULL,
|
|
vote INTEGER NOT NULL,
|
|
reason INTEGER,
|
|
UNIQUE(user_id, target_id, target_type),
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)''')
|
|
|
|
# Favorites table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS favorites (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
rant_id INTEGER NOT NULL,
|
|
UNIQUE(user_id, rant_id),
|
|
FOREIGN KEY (user_id) REFERENCES users (id),
|
|
FOREIGN KEY (rant_id) REFERENCES rants (id)
|
|
)''')
|
|
|
|
# Notifications table
|
|
c.execute('''CREATE TABLE IF NOT EXISTS notifications (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
type TEXT NOT NULL,
|
|
rant_id INTEGER,
|
|
comment_id INTEGER,
|
|
from_user_id INTEGER,
|
|
created_time INTEGER NOT NULL,
|
|
read BOOLEAN DEFAULT 0,
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)''')
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
init_db()
|
|
|
|
# Pydantic models
|
|
class UserRegister(BaseModel):
|
|
email: EmailStr
|
|
username: str
|
|
password: str
|
|
|
|
class UserLogin(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
class RantCreate(BaseModel):
|
|
rant: str
|
|
tags: str
|
|
type: int = 1
|
|
|
|
class CommentCreate(BaseModel):
|
|
comment: str
|
|
|
|
class VoteRequest(BaseModel):
|
|
vote: Literal[-1, 0, 1]
|
|
reason: Optional[int] = None
|
|
|
|
# Helper functions
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def hash_password(password: str) -> str:
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
def generate_token() -> str:
|
|
return secrets.token_urlsafe(32)
|
|
|
|
def get_current_user(token_id: Optional[int] = Form(None),
|
|
token_key: Optional[str] = Form(None),
|
|
user_id: Optional[int] = Form(None)):
|
|
if not all([token_id, token_key, user_id]):
|
|
return None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('''SELECT * FROM auth_tokens
|
|
WHERE id = ? AND token_key = ? AND user_id = ? AND expire_time > ?''',
|
|
(token_id, token_key, user_id, int(datetime.now().timestamp())))
|
|
|
|
token = c.fetchone()
|
|
conn.close()
|
|
|
|
if not token:
|
|
return None
|
|
|
|
return user_id
|
|
|
|
def format_rant(rant_row, user_row, current_user_id=None):
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get comment count
|
|
c.execute('SELECT COUNT(*) as count FROM comments WHERE rant_id = ?', (rant_row['id'],))
|
|
comment_count = c.fetchone()['count']
|
|
|
|
# Get vote state for current user
|
|
vote_state = 0
|
|
if current_user_id:
|
|
c.execute('SELECT vote FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
|
|
(current_user_id, rant_row['id'], 'rant'))
|
|
vote = c.fetchone()
|
|
if vote:
|
|
vote_state = vote['vote']
|
|
|
|
conn.close()
|
|
|
|
tags = json.loads(rant_row['tags']) if rant_row['tags'] else []
|
|
|
|
return {
|
|
"id": rant_row['id'],
|
|
"text": rant_row['text'],
|
|
"score": rant_row['score'],
|
|
"created_time": rant_row['created_time'],
|
|
"attached_image": rant_row['attached_image'],
|
|
"num_comments": comment_count,
|
|
"tags": tags,
|
|
"vote_state": vote_state,
|
|
"edited": bool(rant_row['edited']),
|
|
"rt": rant_row['type'],
|
|
"rc": 1,
|
|
"user_id": user_row['id'],
|
|
"user_username": user_row['username'],
|
|
"user_score": user_row['score'],
|
|
"user_avatar": {
|
|
"b": user_row['avatar_b'],
|
|
"i": user_row['avatar_i'] or ""
|
|
},
|
|
"user_avatar_lg": {
|
|
"b": user_row['avatar_b'],
|
|
"i": user_row['avatar_i'] or ""
|
|
}
|
|
}
|
|
|
|
def format_comment(comment_row, user_row, current_user_id=None):
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get vote state for current user
|
|
vote_state = 0
|
|
if current_user_id:
|
|
c.execute('SELECT vote FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
|
|
(current_user_id, comment_row['id'], 'comment'))
|
|
vote = c.fetchone()
|
|
if vote:
|
|
vote_state = vote['vote']
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"id": comment_row['id'],
|
|
"rant_id": comment_row['rant_id'],
|
|
"body": comment_row['body'],
|
|
"score": comment_row['score'],
|
|
"created_time": comment_row['created_time'],
|
|
"vote_state": vote_state,
|
|
"user_id": user_row['id'],
|
|
"user_username": user_row['username'],
|
|
"user_score": user_row['score'],
|
|
"user_avatar": {
|
|
"b": user_row['avatar_b'],
|
|
"i": user_row['avatar_i'] or ""
|
|
}
|
|
}
|
|
|
|
# Endpoints
|
|
@app.post("/api/users")
|
|
async def register_user(
|
|
email: str = Form(...),
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
type: int = Form(1),
|
|
app: int = Form(3)
|
|
):
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Validate username length
|
|
if len(username) < 4 or len(username) > 15:
|
|
conn.close()
|
|
return {
|
|
"success": False,
|
|
"error": "Your username must be between 4 and 15 characters.",
|
|
"error_field": "username"
|
|
}
|
|
|
|
# Check if username exists
|
|
c.execute('SELECT id FROM users WHERE username = ?', (username,))
|
|
if c.fetchone():
|
|
conn.close()
|
|
return {
|
|
"success": False,
|
|
"error": "Username already taken.",
|
|
"error_field": "username"
|
|
}
|
|
|
|
# Check if email exists
|
|
c.execute('SELECT id FROM users WHERE email = ?', (email,))
|
|
if c.fetchone():
|
|
conn.close()
|
|
return {
|
|
"success": False,
|
|
"error": "Email already registered.",
|
|
"error_field": "email"
|
|
}
|
|
|
|
# Create user
|
|
password_hash = hash_password(password)
|
|
created_time = int(datetime.now().timestamp())
|
|
|
|
try:
|
|
c.execute('''INSERT INTO users (username, email, password_hash, created_time)
|
|
VALUES (?, ?, ?, ?)''',
|
|
(username, email, password_hash, created_time))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True}
|
|
except Exception as e:
|
|
conn.close()
|
|
return {"success": False, "error": str(e)}
|
|
|
|
@app.post("/api/users/auth-token")
|
|
async def login(
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
app: int = Form(3)
|
|
):
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Find user
|
|
c.execute('SELECT * FROM users WHERE username = ? OR email = ?', (username, username))
|
|
user = c.fetchone()
|
|
|
|
if not user or user['password_hash'] != hash_password(password):
|
|
conn.close()
|
|
return {
|
|
"success": False,
|
|
"error": "Invalid login credentials entered. Please try again."
|
|
}
|
|
|
|
# Create auth token
|
|
token_key = generate_token()
|
|
expire_time = int((datetime.now() + timedelta(days=30)).timestamp())
|
|
|
|
c.execute('''INSERT INTO auth_tokens (user_id, token_key, expire_time)
|
|
VALUES (?, ?, ?)''',
|
|
(user['id'], token_key, expire_time))
|
|
|
|
token_id = c.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"auth_token": {
|
|
"id": token_id,
|
|
"key": token_key,
|
|
"expire_time": expire_time,
|
|
"user_id": user['id']
|
|
}
|
|
}
|
|
|
|
@app.get("/api/devrant/rants")
|
|
async def get_rants(
|
|
sort: str = "recent",
|
|
limit: int = 20,
|
|
skip: int = 0,
|
|
app: int = 3,
|
|
token_id: Optional[int] = None,
|
|
token_key: Optional[str] = None,
|
|
user_id: Optional[int] = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get rants
|
|
order_by = "created_time DESC" if sort == "recent" else "score DESC"
|
|
c.execute(f'''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
ORDER BY r.{order_by}
|
|
LIMIT ? OFFSET ?''', (limit, skip))
|
|
|
|
rants = []
|
|
for row in c.fetchall():
|
|
rant_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
rants.append(format_rant(rant_data, user_data, current_user_id))
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"rants": rants,
|
|
"settings": {"notif_state": -1, "notif_token": ""},
|
|
"set": secrets.token_hex(7),
|
|
"wrw": 385,
|
|
"dpp": 0,
|
|
"num_notifs": 0,
|
|
"unread": {"total": 0},
|
|
"news": {
|
|
"id": 356,
|
|
"type": "intlink",
|
|
"headline": "Weekly Group Rant",
|
|
"body": "Share your thoughts!",
|
|
"footer": "Add tag 'wk247' to your rant",
|
|
"height": 100,
|
|
"action": "grouprant"
|
|
}
|
|
}
|
|
|
|
@app.get("/api/devrant/rants/{rant_id}")
|
|
async def get_rant(
|
|
rant_id: int,
|
|
app: int = 3,
|
|
last_comment_id: Optional[int] = None,
|
|
token_id: Optional[int] = None,
|
|
token_key: Optional[str] = None,
|
|
user_id: Optional[int] = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get rant
|
|
c.execute('''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
WHERE r.id = ?''', (rant_id,))
|
|
|
|
rant_row = c.fetchone()
|
|
if not rant_row:
|
|
conn.close()
|
|
raise HTTPException(status_code=404, detail="Rant not found")
|
|
|
|
rant_data = {col: rant_row[col] for col in rant_row.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: rant_row[col] for col in rant_row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = rant_row['user_id']
|
|
user_data['score'] = rant_row['score']
|
|
|
|
rant = format_rant(rant_data, user_data, current_user_id)
|
|
|
|
# Get comments
|
|
c.execute('''SELECT c.*, u.* FROM comments c
|
|
JOIN users u ON c.user_id = u.id
|
|
WHERE c.rant_id = ?
|
|
ORDER BY c.created_time ASC''', (rant_id,))
|
|
|
|
comments = []
|
|
for row in c.fetchall():
|
|
comment_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'rant_id', 'body', 'score', 'created_time']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
comments.append(format_comment(comment_data, user_data, current_user_id))
|
|
|
|
# Check if subscribed (favorited)
|
|
subscribed = 0
|
|
if current_user_id:
|
|
c.execute('SELECT id FROM favorites WHERE user_id = ? AND rant_id = ?',
|
|
(current_user_id, rant_id))
|
|
if c.fetchone():
|
|
subscribed = 1
|
|
|
|
conn.close()
|
|
|
|
# Add link to rant
|
|
rant['link'] = f"rants/{rant_id}/{rant['text'][:50].replace(' ', '-')}"
|
|
|
|
return {
|
|
"rant": rant,
|
|
"comments": comments,
|
|
"success": True,
|
|
"subscribed": subscribed
|
|
}
|
|
|
|
@app.post("/api/devrant/rants")
|
|
async def create_rant(
|
|
rant: str = Form(...),
|
|
tags: str = Form(...),
|
|
type: int = Form(1),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...),
|
|
image: Optional[UploadFile] = File(None)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check for duplicate rant
|
|
c.execute('''SELECT id FROM rants
|
|
WHERE user_id = ? AND text = ? AND created_time > ?''',
|
|
(current_user_id, rant, int(datetime.now().timestamp()) - 300))
|
|
|
|
if c.fetchone():
|
|
conn.close()
|
|
return {
|
|
"success": False,
|
|
"error": "It looks like you just posted this same rant! Your connection might have timed out while posting so you might have seen an error, but sometimes the rant still gets posted and in this case it seems it did, so please check :) If this was not the case please contact info@devrant.io. Thanks!"
|
|
}
|
|
|
|
# Handle image upload
|
|
image_path = ""
|
|
if image:
|
|
file_ext = os.path.splitext(image.filename)[1]
|
|
file_name = f"{secrets.token_hex(16)}{file_ext}"
|
|
file_path = UPLOAD_DIR / file_name
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(await image.read())
|
|
|
|
image_path = f"/uploads/{file_name}"
|
|
|
|
# Create rant
|
|
created_time = int(datetime.now().timestamp())
|
|
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
|
|
|
|
c.execute('''INSERT INTO rants (user_id, text, created_time, attached_image, tags, type)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(current_user_id, rant, created_time, image_path, json.dumps(tags_list), type))
|
|
|
|
rant_id = c.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True, "rant_id": rant_id}
|
|
|
|
@app.post("/api/devrant/rants/{rant_id}")
|
|
async def update_rant(
|
|
rant_id: int,
|
|
rant: str = Form(...),
|
|
tags: str = Form(...),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check ownership
|
|
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
|
|
rant_row = c.fetchone()
|
|
|
|
if not rant_row or rant_row['user_id'] != current_user_id:
|
|
conn.close()
|
|
return {"success": False, "fail_reason": "Unauthorized"}
|
|
|
|
# Update rant
|
|
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
|
|
|
|
c.execute('''UPDATE rants SET text = ?, tags = ?, edited = 1
|
|
WHERE id = ?''',
|
|
(rant, json.dumps(tags_list), rant_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.delete("/api/devrant/rants/{rant_id}")
|
|
async def delete_rant(
|
|
rant_id: int,
|
|
app: int = 3,
|
|
token_id: int = None,
|
|
token_key: str = None,
|
|
user_id: int = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check ownership
|
|
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
|
|
rant_row = c.fetchone()
|
|
|
|
if not rant_row:
|
|
conn.close()
|
|
return {"success": False, "error": "Rant not found"}
|
|
|
|
if rant_row['user_id'] != current_user_id:
|
|
conn.close()
|
|
return {"success": False, "error": "Unauthorized"}
|
|
|
|
# Delete rant and related data
|
|
c.execute('DELETE FROM comments WHERE rant_id = ?', (rant_id,))
|
|
c.execute('DELETE FROM votes WHERE target_id = ? AND target_type = ?', (rant_id, 'rant'))
|
|
c.execute('DELETE FROM favorites WHERE rant_id = ?', (rant_id,))
|
|
c.execute('DELETE FROM rants WHERE id = ?', (rant_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.post("/api/devrant/rants/{rant_id}/vote")
|
|
async def vote_rant(
|
|
rant_id: int,
|
|
vote: int = Form(...),
|
|
reason: Optional[int] = Form(None),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get rant
|
|
c.execute('SELECT * FROM rants WHERE id = ?', (rant_id,))
|
|
rant = c.fetchone()
|
|
|
|
if not rant:
|
|
conn.close()
|
|
return {"success": False, "error": "Rant not found"}
|
|
|
|
# Check for existing vote
|
|
c.execute('SELECT * FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
|
|
(current_user_id, rant_id, 'rant'))
|
|
existing_vote = c.fetchone()
|
|
|
|
if vote == 0:
|
|
# Remove vote
|
|
if existing_vote:
|
|
c.execute('DELETE FROM votes WHERE id = ?', (existing_vote['id'],))
|
|
# Update score
|
|
c.execute('UPDATE rants SET score = score - ? WHERE id = ?',
|
|
(existing_vote['vote'], rant_id))
|
|
else:
|
|
if existing_vote:
|
|
# Update vote
|
|
score_diff = vote - existing_vote['vote']
|
|
c.execute('UPDATE votes SET vote = ?, reason = ? WHERE id = ?',
|
|
(vote, reason, existing_vote['id']))
|
|
c.execute('UPDATE rants SET score = score + ? WHERE id = ?',
|
|
(score_diff, rant_id))
|
|
else:
|
|
# New vote
|
|
c.execute('''INSERT INTO votes (user_id, target_id, target_type, vote, reason)
|
|
VALUES (?, ?, ?, ?, ?)''',
|
|
(current_user_id, rant_id, 'rant', vote, reason))
|
|
c.execute('UPDATE rants SET score = score + ? WHERE id = ?',
|
|
(vote, rant_id))
|
|
|
|
# Update user score
|
|
c.execute('UPDATE users SET score = score + ? WHERE id = ?',
|
|
(vote if vote != 0 else -existing_vote['vote'] if existing_vote else 0,
|
|
rant['user_id']))
|
|
|
|
conn.commit()
|
|
|
|
# Get updated rant
|
|
c.execute('''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
WHERE r.id = ?''', (rant_id,))
|
|
updated_rant = c.fetchone()
|
|
|
|
conn.close()
|
|
|
|
rant_data = {col: updated_rant[col] for col in updated_rant.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: updated_rant[col] for col in updated_rant.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = updated_rant['user_id']
|
|
user_data['score'] = updated_rant['score']
|
|
|
|
return {
|
|
"success": True,
|
|
"rant": format_rant(rant_data, user_data, current_user_id)
|
|
}
|
|
|
|
@app.post("/api/devrant/rants/{rant_id}/favorite")
|
|
async def favorite_rant(
|
|
rant_id: int,
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
try:
|
|
c.execute('INSERT INTO favorites (user_id, rant_id) VALUES (?, ?)',
|
|
(current_user_id, rant_id))
|
|
conn.commit()
|
|
conn.close()
|
|
return {"success": True}
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return {"success": False, "error": "Already favorited"}
|
|
|
|
@app.post("/api/devrant/rants/{rant_id}/unfavorite")
|
|
async def unfavorite_rant(
|
|
rant_id: int,
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('DELETE FROM favorites WHERE user_id = ? AND rant_id = ?',
|
|
(current_user_id, rant_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.post("/api/devrant/rants/{rant_id}/comments")
|
|
async def create_comment(
|
|
rant_id: int,
|
|
comment: str = Form(...),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...),
|
|
image: Optional[UploadFile] = File(None)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "confirmed": False}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check if rant exists
|
|
c.execute('SELECT id FROM rants WHERE id = ?', (rant_id,))
|
|
if not c.fetchone():
|
|
conn.close()
|
|
return {"success": False, "error": "Rant not found"}
|
|
|
|
# Handle image upload
|
|
image_path = ""
|
|
if image:
|
|
file_ext = os.path.splitext(image.filename)[1]
|
|
file_name = f"{secrets.token_hex(16)}{file_ext}"
|
|
file_path = UPLOAD_DIR / file_name
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(await image.read())
|
|
|
|
image_path = f"/uploads/{file_name}"
|
|
|
|
# Create comment
|
|
created_time = int(datetime.now().timestamp())
|
|
|
|
c.execute('''INSERT INTO comments (rant_id, user_id, body, created_time, attached_image)
|
|
VALUES (?, ?, ?, ?, ?)''',
|
|
(rant_id, current_user_id, comment, created_time, image_path))
|
|
|
|
comment_id = c.lastrowid
|
|
|
|
# Create notification for rant owner
|
|
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
|
|
rant_owner = c.fetchone()
|
|
|
|
if rant_owner and rant_owner['user_id'] != current_user_id:
|
|
c.execute('''INSERT INTO notifications (user_id, type, rant_id, comment_id, from_user_id, created_time)
|
|
VALUES (?, ?, ?, ?, ?, ?)''',
|
|
(rant_owner['user_id'], 'comment', rant_id, comment_id, current_user_id, created_time))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.get("/api/comments/{comment_id}")
|
|
async def get_comment(
|
|
comment_id: int,
|
|
app: int = 3,
|
|
token_id: Optional[int] = None,
|
|
token_key: Optional[str] = None,
|
|
user_id: Optional[int] = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('''SELECT c.*, u.* FROM comments c
|
|
JOIN users u ON c.user_id = u.id
|
|
WHERE c.id = ?''', (comment_id,))
|
|
|
|
row = c.fetchone()
|
|
if not row:
|
|
conn.close()
|
|
return {"success": False, "error": "Invalid comment specified in path."}
|
|
|
|
comment_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'rant_id', 'body', 'score', 'created_time']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"comment": format_comment(comment_data, user_data, current_user_id)
|
|
}
|
|
|
|
@app.post("/api/comments/{comment_id}")
|
|
async def update_comment(
|
|
comment_id: int,
|
|
comment: str = Form(...),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check ownership
|
|
c.execute('SELECT user_id FROM comments WHERE id = ?', (comment_id,))
|
|
comment_row = c.fetchone()
|
|
|
|
if not comment_row:
|
|
conn.close()
|
|
return {"success": False, "error": "Invalid comment specified in path."}
|
|
|
|
if comment_row['user_id'] != current_user_id:
|
|
conn.close()
|
|
return {"success": False, "fail_reason": "Unauthorized"}
|
|
|
|
# Update comment
|
|
c.execute('UPDATE comments SET body = ?, edited = 1 WHERE id = ?',
|
|
(comment, comment_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.delete("/api/comments/{comment_id}")
|
|
async def delete_comment(
|
|
comment_id: int,
|
|
app: int = 3,
|
|
token_id: int = None,
|
|
token_key: str = None,
|
|
user_id: int = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Check ownership
|
|
c.execute('SELECT user_id FROM comments WHERE id = ?', (comment_id,))
|
|
comment_row = c.fetchone()
|
|
|
|
if not comment_row:
|
|
conn.close()
|
|
return {"success": False, "error": "Invalid comment specified in path."}
|
|
|
|
if comment_row['user_id'] != current_user_id:
|
|
conn.close()
|
|
return {"success": False, "error": "Unauthorized"}
|
|
|
|
# Delete comment and related data
|
|
c.execute('DELETE FROM votes WHERE target_id = ? AND target_type = ?', (comment_id, 'comment'))
|
|
c.execute('DELETE FROM comments WHERE id = ?', (comment_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.post("/api/comments/{comment_id}/vote")
|
|
async def vote_comment(
|
|
comment_id: int,
|
|
vote: int = Form(...),
|
|
reason: Optional[int] = Form(None),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get comment
|
|
c.execute('SELECT * FROM comments WHERE id = ?', (comment_id,))
|
|
comment = c.fetchone()
|
|
|
|
if not comment:
|
|
conn.close()
|
|
return {"success": False, "error": "Invalid comment specified in path."}
|
|
|
|
# Check for existing vote
|
|
c.execute('SELECT * FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
|
|
(current_user_id, comment_id, 'comment'))
|
|
existing_vote = c.fetchone()
|
|
|
|
if vote == 0:
|
|
# Remove vote
|
|
if existing_vote:
|
|
c.execute('DELETE FROM votes WHERE id = ?', (existing_vote['id'],))
|
|
# Update score
|
|
c.execute('UPDATE comments SET score = score - ? WHERE id = ?',
|
|
(existing_vote['vote'], comment_id))
|
|
else:
|
|
if existing_vote:
|
|
# Update vote
|
|
score_diff = vote - existing_vote['vote']
|
|
c.execute('UPDATE votes SET vote = ?, reason = ? WHERE id = ?',
|
|
(vote, reason, existing_vote['id']))
|
|
c.execute('UPDATE comments SET score = score + ? WHERE id = ?',
|
|
(score_diff, comment_id))
|
|
else:
|
|
# New vote
|
|
c.execute('''INSERT INTO votes (user_id, target_id, target_type, vote, reason)
|
|
VALUES (?, ?, ?, ?, ?)''',
|
|
(current_user_id, comment_id, 'comment', vote, reason))
|
|
c.execute('UPDATE comments SET score = score + ? WHERE id = ?',
|
|
(vote, comment_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.get("/api/users/{user_id}")
|
|
async def get_profile(
|
|
user_id: int,
|
|
app: int = 3,
|
|
token_id: Optional[int] = None,
|
|
token_key: Optional[str] = None,
|
|
auth_user_id: Optional[int] = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, auth_user_id) if token_id else None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get user
|
|
c.execute('SELECT * FROM users WHERE id = ?', (user_id,))
|
|
user = c.fetchone()
|
|
|
|
if not user:
|
|
conn.close()
|
|
return {"success": False, "error": "User not found"}
|
|
|
|
# Get user's rants
|
|
c.execute('''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
WHERE r.user_id = ?
|
|
ORDER BY r.created_time DESC
|
|
LIMIT 50''', (user_id,))
|
|
|
|
rants = []
|
|
for row in c.fetchall():
|
|
rant_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
rants.append(format_rant(rant_data, user_data, current_user_id))
|
|
|
|
# Get user's comments
|
|
c.execute('''SELECT c.*, u.* FROM comments c
|
|
JOIN users u ON c.user_id = u.id
|
|
WHERE c.user_id = ?
|
|
ORDER BY c.created_time DESC
|
|
LIMIT 50''', (user_id,))
|
|
|
|
comments = []
|
|
for row in c.fetchall():
|
|
comment_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'rant_id', 'body', 'score', 'created_time']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
comments.append(format_comment(comment_data, user_data, current_user_id))
|
|
|
|
# Get favorited rants
|
|
c.execute('''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
JOIN favorites f ON f.rant_id = r.id
|
|
WHERE f.user_id = ?
|
|
ORDER BY f.id DESC
|
|
LIMIT 50''', (user_id,))
|
|
|
|
favorites = []
|
|
for row in c.fetchall():
|
|
rant_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
favorites.append(format_rant(rant_data, user_data, current_user_id))
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"profile": {
|
|
"username": user['username'],
|
|
"score": user['score'],
|
|
"about": user['about'],
|
|
"location": user['location'],
|
|
"created_time": user['created_time'],
|
|
"skills": user['skills'],
|
|
"github": user['github'],
|
|
"website": user['website'],
|
|
"avatar": {
|
|
"b": user['avatar_b'],
|
|
"i": user['avatar_i'] or ""
|
|
},
|
|
"content": {
|
|
"content": {
|
|
"rants": rants,
|
|
"comments": comments,
|
|
"favorites": favorites
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@app.get("/api/get-user-id")
|
|
async def get_user_id(
|
|
username: str,
|
|
app: int = 3
|
|
):
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('SELECT id FROM users WHERE username = ?', (username,))
|
|
user = c.fetchone()
|
|
|
|
conn.close()
|
|
|
|
if not user:
|
|
return {"success": False, "error": "User not found"}
|
|
|
|
return {"success": True, "user_id": user['id']}
|
|
|
|
@app.get("/api/devrant/search")
|
|
async def search(
|
|
term: str,
|
|
app: int = 3,
|
|
token_id: Optional[int] = None,
|
|
token_key: Optional[str] = None,
|
|
user_id: Optional[int] = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Search rants
|
|
c.execute('''SELECT r.*, u.* FROM rants r
|
|
JOIN users u ON r.user_id = u.id
|
|
WHERE r.text LIKE ? OR r.tags LIKE ?
|
|
ORDER BY r.score DESC
|
|
LIMIT 50''', (f'%{term}%', f'%{term}%'))
|
|
|
|
results = []
|
|
for row in c.fetchall():
|
|
rant_data = {col: row[col] for col in row.keys() if col in
|
|
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
|
|
user_data = {col: row[col] for col in row.keys() if col in
|
|
['username', 'avatar_b', 'avatar_i']}
|
|
user_data['id'] = row['user_id']
|
|
user_data['score'] = row['score']
|
|
|
|
results.append(format_rant(rant_data, user_data, current_user_id))
|
|
|
|
conn.close()
|
|
|
|
return {"success": True, "results": results}
|
|
|
|
@app.get("/api/users/me/notif-feed")
|
|
async def get_notifications(
|
|
ext_prof: int = 1,
|
|
last_time: Optional[int] = None,
|
|
app: int = 3,
|
|
token_id: int = None,
|
|
token_key: str = None,
|
|
user_id: int = None
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
# Get notifications
|
|
c.execute('''SELECT n.*, u.username FROM notifications n
|
|
LEFT JOIN users u ON n.from_user_id = u.id
|
|
WHERE n.user_id = ?
|
|
ORDER BY n.created_time DESC
|
|
LIMIT 50''', (current_user_id,))
|
|
|
|
items = []
|
|
unread_count = 0
|
|
|
|
for row in c.fetchall():
|
|
item = {
|
|
"type": row['type'],
|
|
"rant_id": row['rant_id'],
|
|
"comment_id": row['comment_id'],
|
|
"created_time": row['created_time'],
|
|
"read": row['read'],
|
|
"uid": row['from_user_id'],
|
|
"username": row['username'] or ""
|
|
}
|
|
items.append(item)
|
|
|
|
if not row['read']:
|
|
unread_count += 1
|
|
|
|
# Mark as read
|
|
c.execute('UPDATE notifications SET read = 1 WHERE user_id = ?', (current_user_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"data": {
|
|
"items": items,
|
|
"check_time": int(datetime.now().timestamp()),
|
|
"username_map": [],
|
|
"unread": {
|
|
"all": unread_count,
|
|
"upvotes": 0,
|
|
"mentions": 0,
|
|
"comments": unread_count,
|
|
"subs": 0,
|
|
"total": unread_count
|
|
},
|
|
"num_unread": unread_count
|
|
}
|
|
}
|
|
|
|
@app.delete("/api/users/me/notif-feed")
|
|
async def clear_notifications(
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('DELETE FROM notifications WHERE user_id = ?', (current_user_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.post("/api/users/me/edit-profile")
|
|
async def edit_profile(
|
|
profile_about: str = Form(""),
|
|
profile_skills: str = Form(""),
|
|
profile_location: str = Form(""),
|
|
profile_website: str = Form(""),
|
|
profile_github: str = Form(""),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
conn = get_db()
|
|
c = conn.cursor()
|
|
|
|
c.execute('''UPDATE users SET about = ?, skills = ?, location = ?, website = ?, github = ?
|
|
WHERE id = ?''',
|
|
(profile_about, profile_skills, profile_location, profile_website,
|
|
profile_github, current_user_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {"success": True}
|
|
|
|
@app.post("/api/users/forgot-password")
|
|
async def forgot_password(
|
|
username: str = Form(...),
|
|
app: int = Form(3)
|
|
):
|
|
# In a real implementation, this would send an email
|
|
return {"success": True}
|
|
|
|
@app.post("/api/users/me/resend-confirm")
|
|
async def resend_confirmation(
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
# In a real implementation, this would send an email
|
|
return {"success": True}
|
|
|
|
@app.post("/api/users/me/mark-news-read")
|
|
async def mark_news_read(
|
|
news_id: str = Form(...),
|
|
app: int = Form(3),
|
|
token_id: int = Form(...),
|
|
token_key: str = Form(...),
|
|
user_id: int = Form(...)
|
|
):
|
|
current_user_id = get_current_user(token_id, token_key, user_id)
|
|
if not current_user_id:
|
|
return {"success": False, "error": "Authentication required"}
|
|
|
|
# In a real implementation, this would mark news as read
|
|
return {"success": True}
|
|
|
|
# Serve uploaded files
|
|
@app.get("/uploads/{filename}")
|
|
async def get_upload(filename: str):
|
|
file_path = UPLOAD_DIR / filename
|
|
if file_path.exists():
|
|
return FileResponse(file_path)
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
# Serve static files (for frontend)
|
|
from fastapi.staticfiles import StaticFiles
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
# Root endpoint serves the main HTML
|
|
@app.get("/")
|
|
async def root():
|
|
return FileResponse("static/index.html")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|