1295 lines
40 KiB
Python
Raw Normal View History

2025-08-03 20:36:36 +02:00
# backend.py
from fastapi import FastAPI, HTTPException, Depends, Form, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel, EmailStr
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime, timedelta
import hashlib
import secrets
import sqlite3
import json
import os
from pathlib import Path
app = FastAPI(title="DevRant Community API")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Database setup
DB_PATH = "devrant_community.db"
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Users table
c.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
score INTEGER DEFAULT 0,
about TEXT DEFAULT '',
location TEXT DEFAULT '',
skills TEXT DEFAULT '',
github TEXT DEFAULT '',
website TEXT DEFAULT '',
created_time INTEGER NOT NULL,
avatar_b TEXT DEFAULT '7bc8a4',
avatar_i TEXT
)''')
# Auth tokens table
c.execute('''CREATE TABLE IF NOT EXISTS auth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token_key TEXT UNIQUE NOT NULL,
expire_time INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id)
)''')
# Rants table
c.execute('''CREATE TABLE IF NOT EXISTS rants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
text TEXT NOT NULL,
score INTEGER DEFAULT 0,
created_time INTEGER NOT NULL,
attached_image TEXT DEFAULT '',
tags TEXT DEFAULT '',
edited BOOLEAN DEFAULT 0,
type INTEGER DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)''')
# Comments table
c.execute('''CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
rant_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
body TEXT NOT NULL,
score INTEGER DEFAULT 0,
created_time INTEGER NOT NULL,
attached_image TEXT DEFAULT '',
edited BOOLEAN DEFAULT 0,
FOREIGN KEY (rant_id) REFERENCES rants (id),
FOREIGN KEY (user_id) REFERENCES users (id)
)''')
# Votes table
c.execute('''CREATE TABLE IF NOT EXISTS votes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
target_id INTEGER NOT NULL,
target_type TEXT NOT NULL,
vote INTEGER NOT NULL,
reason INTEGER,
UNIQUE(user_id, target_id, target_type),
FOREIGN KEY (user_id) REFERENCES users (id)
)''')
# Favorites table
c.execute('''CREATE TABLE IF NOT EXISTS favorites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
rant_id INTEGER NOT NULL,
UNIQUE(user_id, rant_id),
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (rant_id) REFERENCES rants (id)
)''')
# Notifications table
c.execute('''CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
type TEXT NOT NULL,
rant_id INTEGER,
comment_id INTEGER,
from_user_id INTEGER,
created_time INTEGER NOT NULL,
read BOOLEAN DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)''')
conn.commit()
conn.close()
init_db()
# Pydantic models
class UserRegister(BaseModel):
email: EmailStr
username: str
password: str
class UserLogin(BaseModel):
username: str
password: str
class RantCreate(BaseModel):
rant: str
tags: str
type: int = 1
class CommentCreate(BaseModel):
comment: str
class VoteRequest(BaseModel):
vote: Literal[-1, 0, 1]
reason: Optional[int] = None
# Helper functions
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def generate_token() -> str:
return secrets.token_urlsafe(32)
def get_current_user(token_id: Optional[int] = Form(None),
token_key: Optional[str] = Form(None),
user_id: Optional[int] = Form(None)):
if not all([token_id, token_key, user_id]):
return None
conn = get_db()
c = conn.cursor()
c.execute('''SELECT * FROM auth_tokens
WHERE id = ? AND token_key = ? AND user_id = ? AND expire_time > ?''',
(token_id, token_key, user_id, int(datetime.now().timestamp())))
token = c.fetchone()
conn.close()
if not token:
return None
return user_id
def format_rant(rant_row, user_row, current_user_id=None):
conn = get_db()
c = conn.cursor()
# Get comment count
c.execute('SELECT COUNT(*) as count FROM comments WHERE rant_id = ?', (rant_row['id'],))
comment_count = c.fetchone()['count']
# Get vote state for current user
vote_state = 0
if current_user_id:
c.execute('SELECT vote FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
(current_user_id, rant_row['id'], 'rant'))
vote = c.fetchone()
if vote:
vote_state = vote['vote']
conn.close()
tags = json.loads(rant_row['tags']) if rant_row['tags'] else []
return {
"id": rant_row['id'],
"text": rant_row['text'],
"score": rant_row['score'],
"created_time": rant_row['created_time'],
"attached_image": rant_row['attached_image'],
"num_comments": comment_count,
"tags": tags,
"vote_state": vote_state,
"edited": bool(rant_row['edited']),
"rt": rant_row['type'],
"rc": 1,
"user_id": user_row['id'],
"user_username": user_row['username'],
"user_score": user_row['score'],
"user_avatar": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
},
"user_avatar_lg": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
}
}
def format_comment(comment_row, user_row, current_user_id=None):
conn = get_db()
c = conn.cursor()
# Get vote state for current user
vote_state = 0
if current_user_id:
c.execute('SELECT vote FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
(current_user_id, comment_row['id'], 'comment'))
vote = c.fetchone()
if vote:
vote_state = vote['vote']
conn.close()
return {
"id": comment_row['id'],
"rant_id": comment_row['rant_id'],
"body": comment_row['body'],
"score": comment_row['score'],
"created_time": comment_row['created_time'],
"vote_state": vote_state,
"user_id": user_row['id'],
"user_username": user_row['username'],
"user_score": user_row['score'],
"user_avatar": {
"b": user_row['avatar_b'],
"i": user_row['avatar_i'] or ""
}
}
# Endpoints
@app.post("/api/users")
async def register_user(
email: str = Form(...),
username: str = Form(...),
password: str = Form(...),
type: int = Form(1),
app: int = Form(3)
):
conn = get_db()
c = conn.cursor()
# Validate username length
if len(username) < 4 or len(username) > 15:
conn.close()
return {
"success": False,
"error": "Your username must be between 4 and 15 characters.",
"error_field": "username"
}
# Check if username exists
c.execute('SELECT id FROM users WHERE username = ?', (username,))
if c.fetchone():
conn.close()
return {
"success": False,
"error": "Username already taken.",
"error_field": "username"
}
# Check if email exists
c.execute('SELECT id FROM users WHERE email = ?', (email,))
if c.fetchone():
conn.close()
return {
"success": False,
"error": "Email already registered.",
"error_field": "email"
}
# Create user
password_hash = hash_password(password)
created_time = int(datetime.now().timestamp())
try:
c.execute('''INSERT INTO users (username, email, password_hash, created_time)
VALUES (?, ?, ?, ?)''',
(username, email, password_hash, created_time))
conn.commit()
conn.close()
return {"success": True}
except Exception as e:
conn.close()
return {"success": False, "error": str(e)}
@app.post("/api/users/auth-token")
async def login(
username: str = Form(...),
password: str = Form(...),
app: int = Form(3)
):
conn = get_db()
c = conn.cursor()
# Find user
c.execute('SELECT * FROM users WHERE username = ? OR email = ?', (username, username))
user = c.fetchone()
if not user or user['password_hash'] != hash_password(password):
conn.close()
return {
"success": False,
"error": "Invalid login credentials entered. Please try again."
}
# Create auth token
token_key = generate_token()
expire_time = int((datetime.now() + timedelta(days=30)).timestamp())
c.execute('''INSERT INTO auth_tokens (user_id, token_key, expire_time)
VALUES (?, ?, ?)''',
(user['id'], token_key, expire_time))
token_id = c.lastrowid
conn.commit()
conn.close()
return {
"success": True,
"auth_token": {
"id": token_id,
"key": token_key,
"expire_time": expire_time,
"user_id": user['id']
}
}
@app.get("/api/devrant/rants")
async def get_rants(
sort: str = "recent",
limit: int = 20,
skip: int = 0,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
conn = get_db()
c = conn.cursor()
# Get rants
order_by = "created_time DESC" if sort == "recent" else "score DESC"
c.execute(f'''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
ORDER BY r.{order_by}
LIMIT ? OFFSET ?''', (limit, skip))
rants = []
for row in c.fetchall():
rant_data = {col: row[col] for col in row.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
rants.append(format_rant(rant_data, user_data, current_user_id))
conn.close()
return {
"success": True,
"rants": rants,
"settings": {"notif_state": -1, "notif_token": ""},
"set": secrets.token_hex(7),
"wrw": 385,
"dpp": 0,
"num_notifs": 0,
"unread": {"total": 0},
"news": {
"id": 356,
"type": "intlink",
"headline": "Weekly Group Rant",
"body": "Share your thoughts!",
"footer": "Add tag 'wk247' to your rant",
"height": 100,
"action": "grouprant"
}
}
@app.get("/api/devrant/rants/{rant_id}")
async def get_rant(
rant_id: int,
app: int = 3,
last_comment_id: Optional[int] = None,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
conn = get_db()
c = conn.cursor()
# Get rant
c.execute('''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?''', (rant_id,))
rant_row = c.fetchone()
if not rant_row:
conn.close()
raise HTTPException(status_code=404, detail="Rant not found")
rant_data = {col: rant_row[col] for col in rant_row.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: rant_row[col] for col in rant_row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = rant_row['user_id']
user_data['score'] = rant_row['score']
rant = format_rant(rant_data, user_data, current_user_id)
# Get comments
c.execute('''SELECT c.*, u.* FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.rant_id = ?
ORDER BY c.created_time ASC''', (rant_id,))
comments = []
for row in c.fetchall():
comment_data = {col: row[col] for col in row.keys() if col in
['id', 'rant_id', 'body', 'score', 'created_time']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
comments.append(format_comment(comment_data, user_data, current_user_id))
# Check if subscribed (favorited)
subscribed = 0
if current_user_id:
c.execute('SELECT id FROM favorites WHERE user_id = ? AND rant_id = ?',
(current_user_id, rant_id))
if c.fetchone():
subscribed = 1
conn.close()
# Add link to rant
rant['link'] = f"rants/{rant_id}/{rant['text'][:50].replace(' ', '-')}"
return {
"rant": rant,
"comments": comments,
"success": True,
"subscribed": subscribed
}
@app.post("/api/devrant/rants")
async def create_rant(
rant: str = Form(...),
tags: str = Form(...),
type: int = Form(1),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...),
image: Optional[UploadFile] = File(None)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Check for duplicate rant
c.execute('''SELECT id FROM rants
WHERE user_id = ? AND text = ? AND created_time > ?''',
(current_user_id, rant, int(datetime.now().timestamp()) - 300))
if c.fetchone():
conn.close()
return {
"success": False,
"error": "It looks like you just posted this same rant! Your connection might have timed out while posting so you might have seen an error, but sometimes the rant still gets posted and in this case it seems it did, so please check :) If this was not the case please contact info@devrant.io. Thanks!"
}
# Handle image upload
image_path = ""
if image:
file_ext = os.path.splitext(image.filename)[1]
file_name = f"{secrets.token_hex(16)}{file_ext}"
file_path = UPLOAD_DIR / file_name
with open(file_path, "wb") as f:
f.write(await image.read())
image_path = f"/uploads/{file_name}"
# Create rant
created_time = int(datetime.now().timestamp())
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
c.execute('''INSERT INTO rants (user_id, text, created_time, attached_image, tags, type)
VALUES (?, ?, ?, ?, ?, ?)''',
(current_user_id, rant, created_time, image_path, json.dumps(tags_list), type))
rant_id = c.lastrowid
conn.commit()
conn.close()
return {"success": True, "rant_id": rant_id}
@app.post("/api/devrant/rants/{rant_id}")
async def update_rant(
rant_id: int,
rant: str = Form(...),
tags: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Check ownership
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
rant_row = c.fetchone()
if not rant_row or rant_row['user_id'] != current_user_id:
conn.close()
return {"success": False, "fail_reason": "Unauthorized"}
# Update rant
tags_list = [tag.strip() for tag in tags.split(',') if tag.strip()]
c.execute('''UPDATE rants SET text = ?, tags = ?, edited = 1
WHERE id = ?''',
(rant, json.dumps(tags_list), rant_id))
conn.commit()
conn.close()
return {"success": True}
@app.delete("/api/devrant/rants/{rant_id}")
async def delete_rant(
rant_id: int,
app: int = 3,
token_id: int = None,
token_key: str = None,
user_id: int = None
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Check ownership
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
rant_row = c.fetchone()
if not rant_row:
conn.close()
return {"success": False, "error": "Rant not found"}
if rant_row['user_id'] != current_user_id:
conn.close()
return {"success": False, "error": "Unauthorized"}
# Delete rant and related data
c.execute('DELETE FROM comments WHERE rant_id = ?', (rant_id,))
c.execute('DELETE FROM votes WHERE target_id = ? AND target_type = ?', (rant_id, 'rant'))
c.execute('DELETE FROM favorites WHERE rant_id = ?', (rant_id,))
c.execute('DELETE FROM rants WHERE id = ?', (rant_id,))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/devrant/rants/{rant_id}/vote")
async def vote_rant(
rant_id: int,
vote: int = Form(...),
reason: Optional[int] = Form(None),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Get rant
c.execute('SELECT * FROM rants WHERE id = ?', (rant_id,))
rant = c.fetchone()
if not rant:
conn.close()
return {"success": False, "error": "Rant not found"}
# Check for existing vote
c.execute('SELECT * FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
(current_user_id, rant_id, 'rant'))
existing_vote = c.fetchone()
if vote == 0:
# Remove vote
if existing_vote:
c.execute('DELETE FROM votes WHERE id = ?', (existing_vote['id'],))
# Update score
c.execute('UPDATE rants SET score = score - ? WHERE id = ?',
(existing_vote['vote'], rant_id))
else:
if existing_vote:
# Update vote
score_diff = vote - existing_vote['vote']
c.execute('UPDATE votes SET vote = ?, reason = ? WHERE id = ?',
(vote, reason, existing_vote['id']))
c.execute('UPDATE rants SET score = score + ? WHERE id = ?',
(score_diff, rant_id))
else:
# New vote
c.execute('''INSERT INTO votes (user_id, target_id, target_type, vote, reason)
VALUES (?, ?, ?, ?, ?)''',
(current_user_id, rant_id, 'rant', vote, reason))
c.execute('UPDATE rants SET score = score + ? WHERE id = ?',
(vote, rant_id))
# Update user score
c.execute('UPDATE users SET score = score + ? WHERE id = ?',
(vote if vote != 0 else -existing_vote['vote'] if existing_vote else 0,
rant['user_id']))
conn.commit()
# Get updated rant
c.execute('''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?''', (rant_id,))
updated_rant = c.fetchone()
conn.close()
rant_data = {col: updated_rant[col] for col in updated_rant.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: updated_rant[col] for col in updated_rant.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = updated_rant['user_id']
user_data['score'] = updated_rant['score']
return {
"success": True,
"rant": format_rant(rant_data, user_data, current_user_id)
}
@app.post("/api/devrant/rants/{rant_id}/favorite")
async def favorite_rant(
rant_id: int,
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
try:
c.execute('INSERT INTO favorites (user_id, rant_id) VALUES (?, ?)',
(current_user_id, rant_id))
conn.commit()
conn.close()
return {"success": True}
except sqlite3.IntegrityError:
conn.close()
return {"success": False, "error": "Already favorited"}
@app.post("/api/devrant/rants/{rant_id}/unfavorite")
async def unfavorite_rant(
rant_id: int,
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM favorites WHERE user_id = ? AND rant_id = ?',
(current_user_id, rant_id))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/devrant/rants/{rant_id}/comments")
async def create_comment(
rant_id: int,
comment: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...),
image: Optional[UploadFile] = File(None)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "confirmed": False}
conn = get_db()
c = conn.cursor()
# Check if rant exists
c.execute('SELECT id FROM rants WHERE id = ?', (rant_id,))
if not c.fetchone():
conn.close()
return {"success": False, "error": "Rant not found"}
# Handle image upload
image_path = ""
if image:
file_ext = os.path.splitext(image.filename)[1]
file_name = f"{secrets.token_hex(16)}{file_ext}"
file_path = UPLOAD_DIR / file_name
with open(file_path, "wb") as f:
f.write(await image.read())
image_path = f"/uploads/{file_name}"
# Create comment
created_time = int(datetime.now().timestamp())
c.execute('''INSERT INTO comments (rant_id, user_id, body, created_time, attached_image)
VALUES (?, ?, ?, ?, ?)''',
(rant_id, current_user_id, comment, created_time, image_path))
comment_id = c.lastrowid
# Create notification for rant owner
c.execute('SELECT user_id FROM rants WHERE id = ?', (rant_id,))
rant_owner = c.fetchone()
if rant_owner and rant_owner['user_id'] != current_user_id:
c.execute('''INSERT INTO notifications (user_id, type, rant_id, comment_id, from_user_id, created_time)
VALUES (?, ?, ?, ?, ?, ?)''',
(rant_owner['user_id'], 'comment', rant_id, comment_id, current_user_id, created_time))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/comments/{comment_id}")
async def get_comment(
comment_id: int,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
conn = get_db()
c = conn.cursor()
c.execute('''SELECT c.*, u.* FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.id = ?''', (comment_id,))
row = c.fetchone()
if not row:
conn.close()
return {"success": False, "error": "Invalid comment specified in path."}
comment_data = {col: row[col] for col in row.keys() if col in
['id', 'rant_id', 'body', 'score', 'created_time']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
conn.close()
return {
"success": True,
"comment": format_comment(comment_data, user_data, current_user_id)
}
@app.post("/api/comments/{comment_id}")
async def update_comment(
comment_id: int,
comment: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Check ownership
c.execute('SELECT user_id FROM comments WHERE id = ?', (comment_id,))
comment_row = c.fetchone()
if not comment_row:
conn.close()
return {"success": False, "error": "Invalid comment specified in path."}
if comment_row['user_id'] != current_user_id:
conn.close()
return {"success": False, "fail_reason": "Unauthorized"}
# Update comment
c.execute('UPDATE comments SET body = ?, edited = 1 WHERE id = ?',
(comment, comment_id))
conn.commit()
conn.close()
return {"success": True}
@app.delete("/api/comments/{comment_id}")
async def delete_comment(
comment_id: int,
app: int = 3,
token_id: int = None,
token_key: str = None,
user_id: int = None
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Check ownership
c.execute('SELECT user_id FROM comments WHERE id = ?', (comment_id,))
comment_row = c.fetchone()
if not comment_row:
conn.close()
return {"success": False, "error": "Invalid comment specified in path."}
if comment_row['user_id'] != current_user_id:
conn.close()
return {"success": False, "error": "Unauthorized"}
# Delete comment and related data
c.execute('DELETE FROM votes WHERE target_id = ? AND target_type = ?', (comment_id, 'comment'))
c.execute('DELETE FROM comments WHERE id = ?', (comment_id,))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/comments/{comment_id}/vote")
async def vote_comment(
comment_id: int,
vote: int = Form(...),
reason: Optional[int] = Form(None),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Get comment
c.execute('SELECT * FROM comments WHERE id = ?', (comment_id,))
comment = c.fetchone()
if not comment:
conn.close()
return {"success": False, "error": "Invalid comment specified in path."}
# Check for existing vote
c.execute('SELECT * FROM votes WHERE user_id = ? AND target_id = ? AND target_type = ?',
(current_user_id, comment_id, 'comment'))
existing_vote = c.fetchone()
if vote == 0:
# Remove vote
if existing_vote:
c.execute('DELETE FROM votes WHERE id = ?', (existing_vote['id'],))
# Update score
c.execute('UPDATE comments SET score = score - ? WHERE id = ?',
(existing_vote['vote'], comment_id))
else:
if existing_vote:
# Update vote
score_diff = vote - existing_vote['vote']
c.execute('UPDATE votes SET vote = ?, reason = ? WHERE id = ?',
(vote, reason, existing_vote['id']))
c.execute('UPDATE comments SET score = score + ? WHERE id = ?',
(score_diff, comment_id))
else:
# New vote
c.execute('''INSERT INTO votes (user_id, target_id, target_type, vote, reason)
VALUES (?, ?, ?, ?, ?)''',
(current_user_id, comment_id, 'comment', vote, reason))
c.execute('UPDATE comments SET score = score + ? WHERE id = ?',
(vote, comment_id))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/users/{user_id}")
async def get_profile(
user_id: int,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
auth_user_id: Optional[int] = None
):
current_user_id = get_current_user(token_id, token_key, auth_user_id) if token_id else None
conn = get_db()
c = conn.cursor()
# Get user
c.execute('SELECT * FROM users WHERE id = ?', (user_id,))
user = c.fetchone()
if not user:
conn.close()
return {"success": False, "error": "User not found"}
# Get user's rants
c.execute('''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.user_id = ?
ORDER BY r.created_time DESC
LIMIT 50''', (user_id,))
rants = []
for row in c.fetchall():
rant_data = {col: row[col] for col in row.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
rants.append(format_rant(rant_data, user_data, current_user_id))
# Get user's comments
c.execute('''SELECT c.*, u.* FROM comments c
JOIN users u ON c.user_id = u.id
WHERE c.user_id = ?
ORDER BY c.created_time DESC
LIMIT 50''', (user_id,))
comments = []
for row in c.fetchall():
comment_data = {col: row[col] for col in row.keys() if col in
['id', 'rant_id', 'body', 'score', 'created_time']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
comments.append(format_comment(comment_data, user_data, current_user_id))
# Get favorited rants
c.execute('''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
JOIN favorites f ON f.rant_id = r.id
WHERE f.user_id = ?
ORDER BY f.id DESC
LIMIT 50''', (user_id,))
favorites = []
for row in c.fetchall():
rant_data = {col: row[col] for col in row.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
favorites.append(format_rant(rant_data, user_data, current_user_id))
conn.close()
return {
"success": True,
"profile": {
"username": user['username'],
"score": user['score'],
"about": user['about'],
"location": user['location'],
"created_time": user['created_time'],
"skills": user['skills'],
"github": user['github'],
"website": user['website'],
"avatar": {
"b": user['avatar_b'],
"i": user['avatar_i'] or ""
},
"content": {
"content": {
"rants": rants,
"comments": comments,
"favorites": favorites
}
}
}
}
@app.get("/api/get-user-id")
async def get_user_id(
username: str,
app: int = 3
):
conn = get_db()
c = conn.cursor()
c.execute('SELECT id FROM users WHERE username = ?', (username,))
user = c.fetchone()
conn.close()
if not user:
return {"success": False, "error": "User not found"}
return {"success": True, "user_id": user['id']}
@app.get("/api/devrant/search")
async def search(
term: str,
app: int = 3,
token_id: Optional[int] = None,
token_key: Optional[str] = None,
user_id: Optional[int] = None
):
current_user_id = get_current_user(token_id, token_key, user_id) if token_id else None
conn = get_db()
c = conn.cursor()
# Search rants
c.execute('''SELECT r.*, u.* FROM rants r
JOIN users u ON r.user_id = u.id
WHERE r.text LIKE ? OR r.tags LIKE ?
ORDER BY r.score DESC
LIMIT 50''', (f'%{term}%', f'%{term}%'))
results = []
for row in c.fetchall():
rant_data = {col: row[col] for col in row.keys() if col in
['id', 'text', 'score', 'created_time', 'attached_image', 'tags', 'edited', 'type']}
user_data = {col: row[col] for col in row.keys() if col in
['username', 'avatar_b', 'avatar_i']}
user_data['id'] = row['user_id']
user_data['score'] = row['score']
results.append(format_rant(rant_data, user_data, current_user_id))
conn.close()
return {"success": True, "results": results}
@app.get("/api/users/me/notif-feed")
async def get_notifications(
ext_prof: int = 1,
last_time: Optional[int] = None,
app: int = 3,
token_id: int = None,
token_key: str = None,
user_id: int = None
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
# Get notifications
c.execute('''SELECT n.*, u.username FROM notifications n
LEFT JOIN users u ON n.from_user_id = u.id
WHERE n.user_id = ?
ORDER BY n.created_time DESC
LIMIT 50''', (current_user_id,))
items = []
unread_count = 0
for row in c.fetchall():
item = {
"type": row['type'],
"rant_id": row['rant_id'],
"comment_id": row['comment_id'],
"created_time": row['created_time'],
"read": row['read'],
"uid": row['from_user_id'],
"username": row['username'] or ""
}
items.append(item)
if not row['read']:
unread_count += 1
# Mark as read
c.execute('UPDATE notifications SET read = 1 WHERE user_id = ?', (current_user_id,))
conn.commit()
conn.close()
return {
"success": True,
"data": {
"items": items,
"check_time": int(datetime.now().timestamp()),
"username_map": [],
"unread": {
"all": unread_count,
"upvotes": 0,
"mentions": 0,
"comments": unread_count,
"subs": 0,
"total": unread_count
},
"num_unread": unread_count
}
}
@app.delete("/api/users/me/notif-feed")
async def clear_notifications(
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
c.execute('DELETE FROM notifications WHERE user_id = ?', (current_user_id,))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/users/me/edit-profile")
async def edit_profile(
profile_about: str = Form(""),
profile_skills: str = Form(""),
profile_location: str = Form(""),
profile_website: str = Form(""),
profile_github: str = Form(""),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
conn = get_db()
c = conn.cursor()
c.execute('''UPDATE users SET about = ?, skills = ?, location = ?, website = ?, github = ?
WHERE id = ?''',
(profile_about, profile_skills, profile_location, profile_website,
profile_github, current_user_id))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/users/forgot-password")
async def forgot_password(
username: str = Form(...),
app: int = Form(3)
):
# In a real implementation, this would send an email
return {"success": True}
@app.post("/api/users/me/resend-confirm")
async def resend_confirmation(
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# In a real implementation, this would send an email
return {"success": True}
@app.post("/api/users/me/mark-news-read")
async def mark_news_read(
news_id: str = Form(...),
app: int = Form(3),
token_id: int = Form(...),
token_key: str = Form(...),
user_id: int = Form(...)
):
current_user_id = get_current_user(token_id, token_key, user_id)
if not current_user_id:
return {"success": False, "error": "Authentication required"}
# In a real implementation, this would mark news as read
return {"success": True}
# Serve uploaded files
@app.get("/uploads/{filename}")
async def get_upload(filename: str):
file_path = UPLOAD_DIR / filename
if file_path.exists():
return FileResponse(file_path)
raise HTTPException(status_code=404, detail="File not found")
# Serve static files (for frontend)
from fastapi.staticfiles import StaticFiles
app.mount("/static", StaticFiles(directory="static"), name="static")
# Root endpoint serves the main HTML
@app.get("/")
async def root():
return FileResponse("static/index.html")
if __name__ == "__main__":
import uvicorn
2025-08-03 20:37:53 +02:00
uvicorn.run(app, host="0.0.0.0", port=8111)