import asyncio
import fcntl
import logging
import os
import time
from collections import defaultdict
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.exceptions import RequestValidationError
from devplacepy.config import STATIC_DIR, PORT, SERVICE_LOCK_FILE
from devplacepy.database import init_db, get_table, db, get_users_by_uids, get_comment_counts_by_post_uids, get_vote_counts, get_news_images_by_uids
from devplacepy.templating import templates
from devplacepy.utils import get_current_user, time_ago
from devplacepy.seo import base_seo_context, site_url, website_schema
from devplacepy.routers import auth, feed, posts, comments, projects, profile, messages, notifications, votes, avatar, follow, admin, seo, bugs, news, gists, services as services_router, uploads, push, leaderboard
from devplacepy.services.manager import service_manager
from devplacepy.services.news import NewsService
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger(__name__)
_rate_limit_store = defaultdict(list)
RATE_LIMIT = int(os.environ.get("DEVPLACE_RATE_LIMIT", "60"))
RATE_WINDOW = 60
_service_lock_handle = None
def acquire_service_lock() -> bool:
global _service_lock_handle
handle = open(SERVICE_LOCK_FILE, "w")
try:
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
handle.close()
return False
_service_lock_handle = handle
return True
class UploadStaticFiles(StaticFiles):
async def get_response(self, path, scope):
response = await super().get_response(path, scope)
response.headers["Content-Disposition"] = "attachment"
return response
app = FastAPI(title="DevPlace")
app.mount("/static/uploads", UploadStaticFiles(directory=str(STATIC_DIR / "uploads"), check_dir=False), name="uploads")
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@app.exception_handler(404)
async def not_found(request: Request, exc):
seo_ctx = base_seo_context(request, title="Not Found - DevPlace", description="The page you requested does not exist.", robots="noindex")
return templates.TemplateResponse(request, "error.html", {**seo_ctx, "request": request, "error_code": 404, "error_message": "Page not found"}, status_code=404)
@app.exception_handler(500)
async def server_error(request: Request, exc):
logger.exception("500 error on %s %s", request.method, request.url.path)
seo_ctx = base_seo_context(request, title="Server Error - DevPlace", description="Something went wrong.", robots="noindex")
return templates.TemplateResponse(request, "error.html", {**seo_ctx, "request": request, "error_code": 500, "error_message": "Internal server error"}, status_code=500)
_AUTH_FORM_PAGES = {
"/auth/signup": ("signup.html", "Join DevPlace"),
"/auth/login": ("login.html", "Sign In"),
"/auth/forgot-password": ("forgot_password.html", "Reset Password"),
}
_FRIENDLY_ERRORS = {
("username", "too_short"): "Username must be between 3 and 32 characters",
("username", "too_long"): "Username must be between 3 and 32 characters",
("password", "too_short"): "Password must be at least 6 characters",
}
def _friendly_error(err):
field = err["loc"][-1] if err.get("loc") else ""
key = (field, err.get("type", "").replace("string_", ""))
if key in _FRIENDLY_ERRORS:
return _FRIENDLY_ERRORS[key]
msg = err.get("msg", "Invalid input")
prefix = "Value error, "
return msg[len(prefix):] if msg.startswith(prefix) else msg
@app.exception_handler(RequestValidationError)
async def on_validation_error(request: Request, exc: RequestValidationError):
errors = [_friendly_error(e) for e in exc.errors()]
path = request.url.path
page = _AUTH_FORM_PAGES.get(path)
if page is None and path.startswith("/auth/reset-password/"):
page = ("reset_password.html", "Set New Password")
if page:
template_name, title = page
context = {**base_seo_context(request, title=title, robots="noindex,nofollow"), "request": request, "errors": errors}
try:
form = await request.form()
context.update({k: v for k, v in form.items() if isinstance(v, str)})
except Exception:
pass
if "token" in request.path_params:
context["token"] = request.path_params["token"]
return templates.TemplateResponse(request, template_name, context, status_code=400)
referer = request.headers.get("referer") or "/feed"
return RedirectResponse(url=referer, status_code=303)
app.include_router(auth.router, prefix="/auth")
app.include_router(feed.router, prefix="/feed")
app.include_router(posts.router, prefix="/posts")
app.include_router(comments.router, prefix="/comments")
app.include_router(projects.router, prefix="/projects")
app.include_router(profile.router, prefix="/profile")
app.include_router(messages.router, prefix="/messages")
app.include_router(notifications.router, prefix="/notifications")
app.include_router(votes.router, prefix="/votes")
app.include_router(avatar.router, prefix="/avatar")
app.include_router(follow.router, prefix="/follow")
app.include_router(leaderboard.router, prefix="/leaderboard")
app.include_router(admin.router, prefix="/admin")
app.include_router(seo.router)
app.include_router(push.router)
app.include_router(bugs.router, prefix="/bugs")
app.include_router(gists.router, prefix="/gists")
app.include_router(news.router, prefix="/news")
app.include_router(services_router.router, prefix="/admin/services")
app.include_router(uploads.router, prefix="/uploads")
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
if not response.headers.get("X-Robots-Tag"):
response.headers["X-Robots-Tag"] = "index, follow"
response.headers["X-Content-Type-Options"] = "nosniff"
return response
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
if request.method in ("POST", "PUT", "DELETE", "PATCH"):
ip = request.headers.get("X-Real-IP") or (request.client.host if request.client else "unknown")
now = time.time()
window_start = now - RATE_WINDOW
_rate_limit_store[ip] = [t for t in _rate_limit_store[ip] if t > window_start]
if len(_rate_limit_store[ip]) >= RATE_LIMIT:
return HTMLResponse("Rate limit exceeded. Try again later.", status_code=429)
_rate_limit_store[ip].append(now)
return await call_next(request)
@app.on_event("startup")
async def startup():
init_db()
from devplacepy.push import ensure_certificates
ensure_certificates()
if not os.environ.get("DEVPLACE_DISABLE_SERVICES"):
if acquire_service_lock():
service_manager.register(NewsService())
asyncio.create_task(service_manager.start_all())
logger.info(f"Background services started in worker pid {os.getpid()}")
else:
logger.info(f"Worker pid {os.getpid()} declined service lock; another worker owns background services")
logger.info(f"DevPlace started on port {PORT}")
@app.on_event("shutdown")
async def shutdown():
logger.info("Shutting down services...")
await service_manager.stop_all()
@app.get("/")
async def landing(request: Request):
user = get_current_user(request)
if user:
return RedirectResponse(url="/feed", status_code=302)
landing_articles = []
if "news" in db.tables:
news_table = get_table("news")
raw = list(news_table.find(show_on_landing=1, order_by=["-synced_at"], _limit=6))
images_by_news = get_news_images_by_uids([a["uid"] for a in raw])
for a in raw:
landing_articles.append({
"uid": a["uid"],
"slug": a.get("slug", ""),
"title": a.get("title", ""),
"description": (a.get("description", "") or "")[:250],
"url": a.get("url", ""),
"source_name": a.get("source_name", ""),
"grade": a.get("grade", 0),
"synced_at": a.get("synced_at", "") or "",
"time_ago": time_ago(a["synced_at"]),
"image_url": images_by_news.get(a["uid"], ""),
})
landing_posts = []
if "posts" in db.tables:
posts_table = get_table("posts")
raw_posts = list(posts_table.find(order_by=["-created_at"], _limit=6))
if raw_posts:
post_uids = [p["uid"] for p in raw_posts]
author_uids = [p["user_uid"] for p in raw_posts]
authors = get_users_by_uids(author_uids)
comment_counts = get_comment_counts_by_post_uids(post_uids)
upvotes, downvotes = get_vote_counts(post_uids)
for p in raw_posts:
landing_posts.append({
"post": p,
"author": authors.get(p["user_uid"]),
"time_ago": time_ago(p["created_at"]),
"comment_count": comment_counts.get(p["uid"], 0),
"stars": upvotes.get(p["uid"], 0) - downvotes.get(p["uid"], 0),
"slug": p.get("slug", "") or p["uid"],
})
base = site_url(request)
seo_ctx = base_seo_context(
request,
title="DevPlace - The Developer Social Network",
description="Track industry shifts. Discover bold releases. Share what you're building in an open, uncensored environment.",
breadcrumbs=[],
schemas=[website_schema(base)],
)
return templates.TemplateResponse(request, "landing.html", {
**seo_ctx, "request": request,
"landing_articles": landing_articles,
"landing_posts": landing_posts,
})