1115 lines
40 KiB
Python
Raw Normal View History

2025-10-02 13:22:43 +02:00
import sqlite3
import hashlib
import time as timelib
import secrets
from typing import Optional, List
from datetime import datetime, timedelta
from fastapi import FastAPI, Header, HTTPException, Query
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
class PronounceableIDGenerator:
syllables = [
"ba","be","bi","bo","bu","da","de","di","do","du",
"fa","fe","fi","fo","fu","ga","ge","gi","go","gu",
"ha","he","hi","ho","hu","ja","je","ji","jo","ju",
"ka","ke","ki","ko","ku","la","le","li","lo","lu",
"ma","me","mi","mo","mu","na","ne","ni","no","nu",
"pa","pe","pi","po","pu","ra","re","ri","ro","ru",
"sa","se","si","so","su","ta","te","ti","to","tu",
"va","ve","vi","vo","vu","wa","we","wi","wo","wu",
"ya","ye","yi","yo","yu","za","ze","zi","zo","zu",
"cha","che","chi","cho","chu","sha","she","shi","sho","shu"
]
base = len(syllables)
key = 982451653
def __init__(self, start=0):
self.counter = start
def scramble(self, n):
return (n * self.key) % 100_000_000
def encode(self, n):
s = []
for _ in range(4):
s.append(self.syllables[n % self.base])
n //= self.base
return ''.join(reversed(s))
def next_id(self):
n = self.scramble(self.counter)
self.counter += 1
return self.encode(n)
class Database:
def __init__(self, db_path="community.db"):
self.db_path = db_path
self._id = 0
self.init_db()
def get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
conn = self.get_conn()
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS uid (
counter INTEGER DEFAULT 0
)
""")
count = c.execute("SELECT counter FROM uid").fetchone()
if not count:
c.execute("INSERT INTO uid (counter) VALUES (0)")
c.execute("""CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
expires_at INTEGER,
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)""")
c.execute("""CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
karma INTEGER DEFAULT 0,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
)""")
c.execute("""CREATE TABLE IF NOT EXISTS communities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT,
rules TEXT,
wiki TEXT,
sidebar TEXT,
creator_id INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (creator_id) REFERENCES users(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS moderators (
community_id INTEGER,
user_id INTEGER,
PRIMARY KEY (community_id, user_id),
FOREIGN KEY (community_id) REFERENCES communities(id),
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
community_id INTEGER,
user_id INTEGER,
title TEXT NOT NULL,
content TEXT,
url TEXT,
post_type TEXT DEFAULT 'text',
flair TEXT,
nsfw INTEGER DEFAULT 0,
spoiler INTEGER DEFAULT 0,
sticky INTEGER DEFAULT 0,
locked INTEGER DEFAULT 0,
score INTEGER DEFAULT 0,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (community_id) REFERENCES communities(id),
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_posts_community ON posts(community_id)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_posts_user ON posts(user_id)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_posts_score ON posts(score)""")
c.execute("""CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER,
parent_id INTEGER,
user_id INTEGER,
content TEXT NOT NULL,
score INTEGER DEFAULT 0,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (post_id) REFERENCES posts(id),
FOREIGN KEY (parent_id) REFERENCES comments(id),
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_comments_post ON comments(post_id)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_comments_user ON comments(user_id)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_comments_parent ON comments(parent_id)""")
c.execute("""CREATE TABLE IF NOT EXISTS votes (
user_id INTEGER,
target_type TEXT,
target_id INTEGER,
vote INTEGER,
PRIMARY KEY (user_id, target_type, target_id),
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE INDEX IF NOT EXISTS idx_votes_target ON votes(target_type, target_id)""")
c.execute("""CREATE TABLE IF NOT EXISTS subscriptions (
user_id INTEGER,
community_id INTEGER,
PRIMARY KEY (user_id, community_id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (community_id) REFERENCES communities(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS saved (
user_id INTEGER,
target_type TEXT,
target_id INTEGER,
PRIMARY KEY (user_id, target_type, target_id),
FOREIGN KEY (user_id) REFERENCES users(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS hidden (
user_id INTEGER,
post_id INTEGER,
PRIMARY KEY (user_id, post_id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (post_id) REFERENCES posts(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_user_id INTEGER,
to_user_id INTEGER,
subject TEXT,
content TEXT,
read INTEGER DEFAULT 0,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (from_user_id) REFERENCES users(id),
FOREIGN KEY (to_user_id) REFERENCES users(id)
)""")
c.execute("""CREATE TABLE IF NOT EXISTS user_flair (
user_id INTEGER,
community_id INTEGER,
flair TEXT,
PRIMARY KEY (user_id, community_id),
FOREIGN KEY (user_id) REFERENCES users(id),
FOREIGN KEY (community_id) REFERENCES communities(id)
)""")
conn.commit()
conn.close()
def get_id(self):
if not self._id:
self._id = self._get_counter()
return self.update_id()
def _get_counter(self):
conn = self.get_conn()
c = conn.cursor()
c.execute("SELECT counter FROM uid")
return c.fetchone()[0]
def update_id(self):
conn = self.get_conn()
c = conn.cursor()
c.execute("UPDATE uid SET counter = counter + 1")
conn.commit()
conn.close()
self._id += 1
return self._id
def get_uid(self):
gen = PronounceableIDGenerator(self.get_id())
return gen.next_id()
# Pydantic Models
class RegisterRequest(BaseModel):
username: str = Field(..., min_length=3)
password: str = Field(..., min_length=6)
class LoginRequest(BaseModel):
username: str
password: str
class CreateCommunityRequest(BaseModel):
name: str = Field(..., min_length=3)
description: Optional[str] = ""
class CreatePostRequest(BaseModel):
community_id: int
title: str = Field(..., min_length=3)
content: Optional[str] = ""
url: Optional[str] = ""
type: Optional[str] = "text"
flair: Optional[str] = ""
nsfw: Optional[int] = 0
spoiler: Optional[int] = 0
class VoteRequest(BaseModel):
target_type: str
target_id: int
vote: int
class CreateCommentRequest(BaseModel):
post_id: int
parent_id: Optional[int] = None
content: str = Field(..., min_length=1)
class SubscribeRequest(BaseModel):
community_id: int
action: str
class SaveRequest(BaseModel):
target_type: str
target_id: int
action: str
class HideRequest(BaseModel):
post_id: int
action: str
class SendMessageRequest(BaseModel):
to_username: str
subject: str = Field(..., min_length=1)
content: str = Field(..., min_length=1)
class StickyRequest(BaseModel):
post_id: int
sticky: int
class LockRequest(BaseModel):
post_id: int
locked: int
class SetFlairRequest(BaseModel):
community_id: int
flair: Optional[str] = ""
class UpdateCommunityRequest(BaseModel):
community_id: int
rules: Optional[str] = None
wiki: Optional[str] = None
sidebar: Optional[str] = None
# Initialize FastAPI
app = FastAPI()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Database
db = Database()
# Helper functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def get_session_user(x_session_token: Optional[str] = None, authorization: Optional[str] = None) -> Optional[int]:
"""Get user_id from session token"""
token = x_session_token or authorization
if not token:
return None
conn = db.get_conn()
c = conn.cursor()
now = int(timelib.time())
c.execute("SELECT user_id FROM sessions WHERE token = ? AND expires_at > ?", (token, now))
result = c.fetchone()
conn.close()
return result["user_id"] if result else None
def create_session(user_id: int) -> str:
"""Create a new session for user"""
token = secrets.token_urlsafe(32)
expires_at = int(timelib.time()) + (30 * 24 * 60 * 60)
conn = db.get_conn()
c = conn.cursor()
c.execute("DELETE FROM sessions WHERE expires_at < ?", (int(timelib.time()),))
c.execute("INSERT INTO sessions (token, user_id, expires_at) VALUES (?, ?, ?)",
(token, user_id, expires_at))
conn.commit()
conn.close()
return token
def calculate_hot_score(score: int, created_at: int) -> float:
now = timelib.time()
age_hours = (now - created_at) / 3600
return (score - 1) / pow(age_hours + 2, 1.5)
def calculate_controversial_score(upvotes: int, downvotes: int) -> float:
if upvotes == 0 and downvotes == 0:
return 0
total = upvotes + downvotes
balance = min(upvotes, downvotes)
return total * balance
# Routes
@app.get("/")
@app.get("/index.html")
async def serve_index():
return FileResponse("index.html")
@app.post("/api/register")
async def api_register(request: RegisterRequest):
conn = db.get_conn()
c = conn.cursor()
try:
c.execute("INSERT INTO users (username, password) VALUES (?, ?)",
(request.username, hash_password(request.password)))
conn.commit()
user_id = c.lastrowid
c.execute("SELECT id, username, karma FROM users WHERE id = ?", (user_id,))
user = c.fetchone()
conn.close()
token = create_session(user_id)
return {"success": True, "token": token, "username": user["username"], "karma": user["karma"]}
except sqlite3.IntegrityError:
conn.close()
return {"success": False, "error": "Username already exists"}
@app.post("/api/login")
async def api_login(request: LoginRequest):
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT * FROM users WHERE username = ? AND password = ?",
(request.username, hash_password(request.password)))
user = c.fetchone()
conn.close()
if user:
token = create_session(user["id"])
return {"success": True, "token": token, "user_id": user["id"], "username": user["username"], "karma": user["karma"]}
return {"success": False, "error": "Invalid credentials"}
@app.post("/api/logout")
async def api_logout(x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
token = x_session_token or authorization
if token:
conn = db.get_conn()
c = conn.cursor()
c.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/me")
async def api_me(x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT id, username, karma FROM users WHERE id = ?", (user_id,))
user = c.fetchone()
conn.close()
if user:
return {"success": True, "user": dict(user)}
return {"success": False, "error": "User not found"}
@app.post("/api/create_community")
async def api_create_community(request: CreateCommunityRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
try:
c.execute("INSERT INTO communities (name, description, creator_id) VALUES (?, ?, ?)",
(request.name, request.description, user_id))
community_id = c.lastrowid
c.execute("INSERT INTO moderators (community_id, user_id) VALUES (?, ?)",
(community_id, user_id))
c.execute("INSERT INTO subscriptions (user_id, community_id) VALUES (?, ?)",
(user_id, community_id))
conn.commit()
conn.close()
return {"success": True, "community_id": community_id}
except sqlite3.IntegrityError:
conn.close()
return {"success": False, "error": "Community already exists"}
@app.get("/api/communities")
async def api_get_communities():
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT c.*, u.username as creator,
(SELECT COUNT(*) FROM subscriptions WHERE community_id = c.id) as subscribers
FROM communities c
LEFT JOIN users u ON c.creator_id = u.id
ORDER BY subscribers DESC""")
communities = [dict(row) for row in c.fetchall()]
conn.close()
return {"communities": communities}
@app.get("/api/community")
async def api_get_community(id: Optional[int] = None, name: Optional[str] = None):
conn = db.get_conn()
c = conn.cursor()
if id:
c.execute("SELECT * FROM communities WHERE id = ?", (id,))
elif name:
c.execute("SELECT * FROM communities WHERE name = ?", (name,))
else:
conn.close()
return {"success": False, "error": "ID or name required"}
community = c.fetchone()
conn.close()
if community:
return {"success": True, "community": dict(community)}
return {"success": False, "error": "Community not found"}
@app.get("/api/posts")
async def api_get_posts(
community_id: Optional[int] = None,
sort: str = "hot",
time: str = "all",
feed: str = "all",
limit: int = 50,
offset: int = 0,
x_session_token: Optional[str] = Header(None),
authorization: Optional[str] = Header(None)
):
user_id = get_session_user(x_session_token, authorization)
conn = db.get_conn()
c = conn.cursor()
time_cutoff = 0
now = timelib.time()
if time == "hour":
time_cutoff = now - 3600
elif time == "day":
time_cutoff = now - 86400
elif time == "week":
time_cutoff = now - 604800
elif time == "month":
time_cutoff = now - 2592000
elif time == "year":
time_cutoff = now - 31536000
base_query = """SELECT p.*, u.username, c.name as community_name,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comment_count,
(SELECT vote FROM votes WHERE user_id = ? AND target_type = 'post' AND target_id = p.id) as user_vote
FROM posts p
LEFT JOIN users u ON p.user_id = u.id
LEFT JOIN communities c ON p.community_id = c.id"""
where_clauses = []
params = [user_id if user_id else 0]
if community_id:
where_clauses.append("p.community_id = ?")
params.append(community_id)
if feed == "home" and user_id:
where_clauses.append("p.community_id IN (SELECT community_id FROM subscriptions WHERE user_id = ?)")
params.append(user_id)
is_subscribed = False
if user_id:
where_clauses.append("p.id NOT IN (SELECT post_id FROM hidden WHERE user_id = ?)")
params.append(user_id)
if community_id:
is_subscribed_query = "SELECT community_id FROM subscriptions WHERE user_id = ? AND community_id = ?"
c.execute(is_subscribed_query, (user_id, community_id))
result = c.fetchone()
is_subscribed = result and str(dict(result).get("community_id")) == str(community_id)
if time_cutoff > 0:
where_clauses.append("p.created_at >= ?")
params.append(time_cutoff)
if where_clauses:
base_query += " WHERE " + " AND ".join(where_clauses)
if sort == "new":
base_query += " ORDER BY p.sticky DESC, p.created_at DESC"
elif sort == "top":
base_query += " ORDER BY p.sticky DESC, p.score DESC"
elif sort == "old":
base_query += " ORDER BY p.sticky DESC, p.created_at ASC"
else:
base_query += " ORDER BY p.sticky DESC, p.score DESC"
base_query += f" LIMIT {limit} OFFSET {offset}"
c.execute(base_query, params)
posts = [dict(row) for row in c.fetchall()]
if sort == "hot":
for post in posts:
if post["sticky"]:
post["hot_score"] = float('inf')
else:
post["hot_score"] = calculate_hot_score(post["score"], post["created_at"])
posts.sort(key=lambda x: x["hot_score"], reverse=True)
elif sort == "controversial":
if posts:
post_ids = [str(p["id"]) for p in posts]
c.execute(f"""
SELECT target_id,
SUM(CASE WHEN vote = 1 THEN 1 ELSE 0 END) as upvotes,
SUM(CASE WHEN vote = -1 THEN 1 ELSE 0 END) as downvotes
FROM votes
WHERE target_type = 'post' AND target_id IN ({','.join(['?']*len(post_ids))})
GROUP BY target_id
""", post_ids)
vote_counts = {row["target_id"]: (row["upvotes"], row["downvotes"]) for row in c.fetchall()}
for post in posts:
upvotes, downvotes = vote_counts.get(post["id"], (0, 0))
post["controversial_score"] = calculate_controversial_score(upvotes, downvotes)
posts.sort(key=lambda x: x.get("controversial_score", 0), reverse=True)
elif sort == "rising":
recent_cutoff = now - 7200
rising_posts = [p for p in posts if p["created_at"] >= recent_cutoff]
for post in rising_posts:
age_hours = (now - post["created_at"]) / 3600
post["rising_score"] = post["score"] / max(age_hours, 0.1)
rising_posts.sort(key=lambda x: x["rising_score"], reverse=True)
posts = rising_posts
conn.close()
return {"posts": posts, "is_subscribed": is_subscribed}
@app.get("/api/post")
async def api_get_post(id: int, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT p.*, u.username, c.name as community_name,
(SELECT COUNT(*) FROM comments WHERE post_id = p.id) as comment_count,
(SELECT vote FROM votes WHERE user_id = ? AND target_type = 'post' AND target_id = p.id) as user_vote
FROM posts p
LEFT JOIN users u ON p.user_id = u.id
LEFT JOIN communities c ON p.community_id = c.id
WHERE p.id = ?""", (user_id if user_id else 0, id))
post = c.fetchone()
conn.close()
if post:
return {"success": True, "post": dict(post)}
return {"success": False, "error": "Post not found"}
@app.post("/api/create_post")
async def api_create_post(request: CreatePostRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("""INSERT INTO posts (community_id, user_id, title, content, url, post_type, flair, nsfw, spoiler)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(request.community_id, user_id, request.title, request.content, request.url,
request.type, request.flair, request.nsfw, request.spoiler))
post_id = c.lastrowid
conn.commit()
conn.close()
return {"success": True, "post_id": post_id}
@app.post("/api/vote")
async def api_vote(request: VoteRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT vote FROM votes WHERE user_id = ? AND target_type = ? AND target_id = ?",
(user_id, request.target_type, request.target_id))
existing = c.fetchone()
if existing:
old_vote = existing["vote"]
if request.vote == 0:
c.execute("DELETE FROM votes WHERE user_id = ? AND target_type = ? AND target_id = ?",
(user_id, request.target_type, request.target_id))
score_change = -old_vote
else:
c.execute("UPDATE votes SET vote = ? WHERE user_id = ? AND target_type = ? AND target_id = ?",
(request.vote, user_id, request.target_type, request.target_id))
score_change = request.vote - old_vote
else:
if request.vote != 0:
c.execute("INSERT INTO votes (user_id, target_type, target_id, vote) VALUES (?, ?, ?, ?)",
(user_id, request.target_type, request.target_id, request.vote))
score_change = request.vote
else:
score_change = 0
if score_change != 0:
if request.target_type == "post":
c.execute("UPDATE posts SET score = score + ? WHERE id = ?", (score_change, request.target_id))
c.execute("SELECT user_id FROM posts WHERE id = ?", (request.target_id,))
post = c.fetchone()
if post:
c.execute("UPDATE users SET karma = karma + ? WHERE id = ?", (score_change, post["user_id"]))
elif request.target_type == "comment":
c.execute("UPDATE comments SET score = score + ? WHERE id = ?", (score_change, request.target_id))
c.execute("SELECT user_id FROM comments WHERE id = ?", (request.target_id,))
comment = c.fetchone()
if comment:
c.execute("UPDATE users SET karma = karma + ? WHERE id = ?", (score_change, comment["user_id"]))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/comment")
async def api_create_comment(request: CreateCommentRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("INSERT INTO comments (post_id, parent_id, user_id, content) VALUES (?, ?, ?, ?)",
(request.post_id, request.parent_id, user_id, request.content))
comment_id = c.lastrowid
conn.commit()
conn.close()
return {"success": True, "comment_id": comment_id}
@app.get("/api/comments")
async def api_get_comments(post_id: int, sort: str = "best", x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT c.*, u.username,
(SELECT vote FROM votes WHERE user_id = ? AND target_type = 'comment' AND target_id = c.id) as user_vote
FROM comments c
LEFT JOIN users u ON c.user_id = u.id
WHERE c.post_id = ?""", (user_id if user_id else 0, post_id))
comments = [dict(row) for row in c.fetchall()]
if sort == "top":
comments.sort(key=lambda x: x["score"], reverse=True)
elif sort == "new":
comments.sort(key=lambda x: x["created_at"], reverse=True)
elif sort == "old":
comments.sort(key=lambda x: x["created_at"])
elif sort == "controversial":
if comments:
comment_ids = [str(cm["id"]) for cm in comments]
c.execute(f"""
SELECT target_id,
SUM(CASE WHEN vote = 1 THEN 1 ELSE 0 END) as upvotes,
SUM(CASE WHEN vote = -1 THEN 1 ELSE 0 END) as downvotes
FROM votes
WHERE target_type = 'comment' AND target_id IN ({','.join(['?']*len(comment_ids))})
GROUP BY target_id
""", comment_ids)
vote_counts = {row["target_id"]: (row["upvotes"], row["downvotes"]) for row in c.fetchall()}
for comment in comments:
upvotes, downvotes = vote_counts.get(comment["id"], (0, 0))
comment["controversial_score"] = calculate_controversial_score(upvotes, downvotes)
comments.sort(key=lambda x: x.get("controversial_score", 0), reverse=True)
else:
comments.sort(key=lambda x: (x["score"], -x["created_at"]), reverse=True)
conn.close()
return {"comments": comments}
@app.post("/api/subscribe")
async def api_subscribe(request: SubscribeRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
if request.action == "subscribe":
try:
c.execute("INSERT INTO subscriptions (user_id, community_id) VALUES (?, ?)",
(user_id, request.community_id))
conn.commit()
except sqlite3.IntegrityError:
pass
elif request.action == "unsubscribe":
c.execute("DELETE FROM subscriptions WHERE user_id = ? AND community_id = ?",
(user_id, request.community_id))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/subscriptions")
async def api_get_subscriptions(x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT c.* FROM communities c
JOIN subscriptions s ON c.id = s.community_id
WHERE s.user_id = ?""", (user_id,))
subscriptions = [dict(row) for row in c.fetchall()]
conn.close()
return {"subscriptions": subscriptions}
@app.get("/api/user")
async def api_get_user(id: Optional[int] = None, username: Optional[str] = None):
conn = db.get_conn()
c = conn.cursor()
if id:
c.execute("SELECT id, username, karma, created_at FROM users WHERE id = ?", (id,))
elif username:
c.execute("SELECT id, username, karma, created_at FROM users WHERE username = ?", (username,))
else:
conn.close()
return {"success": False, "error": "ID or username required"}
user = c.fetchone()
if not user:
conn.close()
return {"success": False, "error": "User not found"}
user_dict = dict(user)
c.execute("""SELECT p.*,u.username, c.name as community_name FROM posts p
LEFT JOIN communities c ON p.community_id = c.id
INNER JOIN users u ON p.user_id = u.id
WHERE p.user_id = ? ORDER BY p.created_at DESC LIMIT 20""", (user_dict["id"],))
posts = [dict(row) for row in c.fetchall()]
c.execute("""SELECT c.*,u.username, p.title as post_title FROM comments c
LEFT JOIN posts p ON c.post_id = p.id
INNER JOIN users u ON c.user_id = u.id
WHERE c.user_id = ? ORDER BY c.created_at DESC LIMIT 20""", (user_dict["id"],))
comments = [dict(row) for row in c.fetchall()]
conn.close()
return {"success": True, "user": user_dict, "posts": posts, "comments": comments}
@app.post("/api/save")
async def api_save(request: SaveRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
if request.action == "save":
try:
c.execute("INSERT INTO saved (user_id, target_type, target_id) VALUES (?, ?, ?)",
(user_id, request.target_type, request.target_id))
conn.commit()
except sqlite3.IntegrityError:
pass
elif request.action == "unsave":
c.execute("DELETE FROM saved WHERE user_id = ? AND target_type = ? AND target_id = ?",
(user_id, request.target_type, request.target_id))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/hide")
async def api_hide(request: HideRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
if request.action == "hide":
try:
c.execute("INSERT INTO hidden (user_id, post_id) VALUES (?, ?)",
(user_id, request.post_id))
conn.commit()
except sqlite3.IntegrityError:
pass
elif request.action == "unhide":
c.execute("DELETE FROM hidden WHERE user_id = ? AND post_id = ?",
(user_id, request.post_id))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/saved")
async def api_get_saved(x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT p.*, u.username, c.name as community_name, 'post' as type
FROM posts p
LEFT JOIN users u ON p.user_id = u.id
LEFT JOIN communities c ON p.community_id = c.id
WHERE p.id IN (SELECT target_id FROM saved WHERE user_id = ? AND target_type = 'post')
ORDER BY p.created_at DESC""", (user_id,))
saved_posts = [dict(row) for row in c.fetchall()]
c.execute("""SELECT c.*, u.username, 'comment' as type
FROM comments c
LEFT JOIN users u ON c.user_id = u.id
WHERE c.id IN (SELECT target_id FROM saved WHERE user_id = ? AND target_type = 'comment')
ORDER BY c.created_at DESC""", (user_id,))
saved_comments = [dict(row) for row in c.fetchall()]
conn.close()
return {"posts": saved_posts, "comments": saved_comments}
@app.get("/api/search")
async def api_search(q: str = "", community_id: Optional[int] = None, type: str = "all"):
if not q:
return {"posts": [], "comments": [], "communities": []}
conn = db.get_conn()
c = conn.cursor()
search_term = f"%{q}%"
posts = []
comments = []
communities = []
if type in ["all", "posts"]:
post_query = """SELECT p.*, u.username, c.name as community_name FROM posts p
LEFT JOIN users u ON p.user_id = u.id
LEFT JOIN communities c ON p.community_id = c.id
WHERE (p.title LIKE ? OR p.content LIKE ?)"""
params = [search_term, search_term]
if community_id:
post_query += " AND p.community_id = ?"
params.append(community_id)
post_query += " ORDER BY p.created_at DESC LIMIT 50"
c.execute(post_query, params)
posts = [dict(row) for row in c.fetchall()]
if type in ["all", "comments"]:
comment_query = """SELECT c.*, u.username FROM comments c
LEFT JOIN users u ON c.user_id = u.id
WHERE c.content LIKE ?"""
params = [search_term]
if community_id:
comment_query += " AND c.post_id IN (SELECT id FROM posts WHERE community_id = ?)"
params.append(community_id)
comment_query += " ORDER BY c.created_at DESC LIMIT 50"
c.execute(comment_query, params)
comments = [dict(row) for row in c.fetchall()]
if type in ["all", "communities"]:
c.execute("""SELECT * FROM communities
WHERE name LIKE ? OR description LIKE ?
ORDER BY created_at DESC LIMIT 20""", (search_term, search_term))
communities = [dict(row) for row in c.fetchall()]
conn.close()
return {"posts": posts, "comments": comments, "communities": communities}
@app.post("/api/message")
async def api_send_message(request: SendMessageRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT id FROM users WHERE username = ?", (request.to_username,))
to_user = c.fetchone()
if not to_user:
conn.close()
return {"success": False, "error": "User not found"}
c.execute("INSERT INTO messages (from_user_id, to_user_id, subject, content) VALUES (?, ?, ?, ?)",
(user_id, to_user["id"], request.subject, request.content))
conn.commit()
conn.close()
return {"success": True}
@app.get("/api/messages")
async def api_get_messages(x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("""SELECT m.*, u.username as from_username FROM messages m
LEFT JOIN users u ON m.from_user_id = u.id
WHERE m.to_user_id = ?
ORDER BY m.created_at DESC""", (user_id,))
messages = [dict(row) for row in c.fetchall()]
c.execute("UPDATE messages SET read = 1 WHERE to_user_id = ?", (user_id,))
conn.commit()
conn.close()
return {"messages": messages}
@app.post("/api/sticky")
async def api_sticky(request: StickyRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT community_id FROM posts WHERE id = ?", (request.post_id,))
post = c.fetchone()
if not post:
conn.close()
return {"success": False, "error": "Post not found"}
c.execute("SELECT * FROM moderators WHERE community_id = ? AND user_id = ?",
(post["community_id"], user_id))
is_mod = c.fetchone()
if not is_mod:
conn.close()
return {"success": False, "error": "Not a moderator"}
c.execute("UPDATE posts SET sticky = ? WHERE id = ?", (request.sticky, request.post_id))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/lock")
async def api_lock(request: LockRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT community_id FROM posts WHERE id = ?", (request.post_id,))
post = c.fetchone()
if not post:
conn.close()
return {"success": False, "error": "Post not found"}
c.execute("SELECT * FROM moderators WHERE community_id = ? AND user_id = ?",
(post["community_id"], user_id))
is_mod = c.fetchone()
if not is_mod:
conn.close()
return {"success": False, "error": "Not a moderator"}
c.execute("UPDATE posts SET locked = ? WHERE id = ?", (request.locked, request.post_id))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/set_flair")
async def api_set_flair(request: SetFlairRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("INSERT OR REPLACE INTO user_flair (user_id, community_id, flair) VALUES (?, ?, ?)",
(user_id, request.community_id, request.flair))
conn.commit()
conn.close()
return {"success": True}
@app.post("/api/update_community")
async def api_update_community(request: UpdateCommunityRequest, x_session_token: Optional[str] = Header(None), authorization: Optional[str] = Header(None)):
user_id = get_session_user(x_session_token, authorization)
if not user_id:
return {"success": False, "error": "Not logged in"}
conn = db.get_conn()
c = conn.cursor()
c.execute("SELECT * FROM moderators WHERE community_id = ? AND user_id = ?",
(request.community_id, user_id))
is_mod = c.fetchone()
if not is_mod:
conn.close()
return {"success": False, "error": "Not a moderator"}
updates = []
params = []
if request.rules is not None:
updates.append("rules = ?")
params.append(request.rules)
if request.wiki is not None:
updates.append("wiki = ?")
params.append(request.wiki)
if request.sidebar is not None:
updates.append("sidebar = ?")
params.append(request.sidebar)
if updates:
params.append(request.community_id)
c.execute(f"UPDATE communities SET {', '.join(updates)} WHERE id = ?", params)
conn.commit()
conn.close()
return {"success": True}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8589)