|
# retoor <retoor@molodetz.nl>
|
|
from datetime import datetime, date
|
|
from decimal import Decimal
|
|
from typing import Optional
|
|
import math
|
|
|
|
from fastapi import APIRouter, Request, Form, Depends, Query
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from mywebdav.models import User, Activity
|
|
from mywebdav.billing.models import (
|
|
Invoice, InvoiceLineItem, PricingConfig,
|
|
UserSubscription, UsageAggregate
|
|
)
|
|
from mywebdav.admin_auth import (
|
|
verify_admin_credentials, get_admin_session,
|
|
create_session_response, clear_session_response,
|
|
generate_csrf_token, verify_csrf_token
|
|
)
|
|
from mywebdav.auth import get_password_hash
|
|
|
|
router = APIRouter(prefix="/manage", tags=["admin-panel"])
|
|
templates = Jinja2Templates(directory="mywebdav/templates")
|
|
|
|
|
|
def format_bytes(bytes_value: int) -> str:
|
|
if bytes_value is None:
|
|
return "0 B"
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if abs(bytes_value) < 1024.0:
|
|
return f"{bytes_value:.1f} {unit}"
|
|
bytes_value /= 1024.0
|
|
return f"{bytes_value:.1f} PB"
|
|
|
|
|
|
def format_currency(value: float) -> str:
|
|
return f"${value:.2f}"
|
|
|
|
|
|
templates.env.filters['format_bytes'] = format_bytes
|
|
templates.env.filters['format_currency'] = format_currency
|
|
|
|
|
|
@router.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request, error: Optional[str] = None):
|
|
session = get_admin_session(request)
|
|
if session:
|
|
return RedirectResponse(url="/manage/", status_code=303)
|
|
return templates.TemplateResponse("admin/login.html", {
|
|
"request": request,
|
|
"error": error
|
|
})
|
|
|
|
|
|
@router.post("/login")
|
|
async def login_submit(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...)
|
|
):
|
|
if verify_admin_credentials(username, password):
|
|
response = RedirectResponse(url="/manage/", status_code=303)
|
|
return create_session_response(response, username)
|
|
return RedirectResponse(url="/manage/login?error=invalid", status_code=303)
|
|
|
|
|
|
@router.get("/logout")
|
|
async def logout(request: Request):
|
|
response = RedirectResponse(url="/manage/login", status_code=303)
|
|
return clear_session_response(response)
|
|
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
async def dashboard(request: Request):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
total_users = await User.all().count()
|
|
active_users = await User.filter(is_active=True).count()
|
|
inactive_users = total_users - active_users
|
|
|
|
total_storage = 0
|
|
users = await User.all()
|
|
for user in users:
|
|
total_storage += user.used_storage_bytes or 0
|
|
|
|
current_month = date.today().replace(day=1)
|
|
paid_invoices = await Invoice.filter(
|
|
status="paid",
|
|
period_start__gte=current_month
|
|
).all()
|
|
monthly_revenue = sum(float(inv.total) for inv in paid_invoices)
|
|
|
|
pending_invoices = await Invoice.filter(status="open").count()
|
|
|
|
recent_activities = await Activity.all().order_by("-timestamp").limit(10)
|
|
activity_list = []
|
|
for act in recent_activities:
|
|
user = await User.get_or_none(id=act.user_id)
|
|
activity_list.append({
|
|
"user": user.username if user else "Unknown",
|
|
"action": act.action,
|
|
"timestamp": act.timestamp
|
|
})
|
|
|
|
return templates.TemplateResponse("admin/dashboard.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"stats": {
|
|
"total_users": total_users,
|
|
"active_users": active_users,
|
|
"inactive_users": inactive_users,
|
|
"total_storage": total_storage,
|
|
"monthly_revenue": monthly_revenue,
|
|
"pending_invoices": pending_invoices
|
|
},
|
|
"recent_activities": activity_list
|
|
})
|
|
|
|
|
|
@router.get("/users", response_class=HTMLResponse)
|
|
async def users_list(
|
|
request: Request,
|
|
search: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=5, le=100)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
query = User.all()
|
|
|
|
if search:
|
|
query = query.filter(username__icontains=search) | User.filter(email__icontains=search)
|
|
|
|
if status == "active":
|
|
query = query.filter(is_active=True)
|
|
elif status == "inactive":
|
|
query = query.filter(is_active=False)
|
|
elif status == "superuser":
|
|
query = query.filter(is_superuser=True)
|
|
|
|
total = await query.count()
|
|
total_pages = math.ceil(total / per_page) if total > 0 else 1
|
|
offset = (page - 1) * per_page
|
|
|
|
users = await query.order_by("-created_at").offset(offset).limit(per_page)
|
|
|
|
return templates.TemplateResponse("admin/users/list.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"users": users,
|
|
"search": search or "",
|
|
"status": status or "",
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"total": total,
|
|
"total_pages": total_pages
|
|
})
|
|
|
|
|
|
@router.get("/users/{user_id}", response_class=HTMLResponse)
|
|
async def user_detail(request: Request, user_id: int):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
user = await User.get_or_none(id=user_id)
|
|
if not user:
|
|
return RedirectResponse(url="/manage/users?error=not_found", status_code=303)
|
|
|
|
subscription = await UserSubscription.get_or_none(user_id=user_id)
|
|
|
|
invoices = await Invoice.filter(user_id=user_id).order_by("-created_at").limit(5)
|
|
|
|
usage_percent = 0
|
|
if user.storage_quota_bytes and user.storage_quota_bytes > 0:
|
|
usage_percent = (user.used_storage_bytes or 0) / user.storage_quota_bytes * 100
|
|
|
|
return templates.TemplateResponse("admin/users/detail.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"user": user,
|
|
"subscription": subscription,
|
|
"invoices": invoices,
|
|
"usage_percent": min(100, usage_percent)
|
|
})
|
|
|
|
|
|
@router.post("/users/{user_id}")
|
|
async def user_update(
|
|
request: Request,
|
|
user_id: int,
|
|
csrf_token: str = Form(...),
|
|
username: str = Form(...),
|
|
email: str = Form(...),
|
|
password: Optional[str] = Form(None),
|
|
storage_quota_gb: float = Form(...),
|
|
plan_type: str = Form(...),
|
|
is_active: bool = Form(False),
|
|
is_superuser: bool = Form(False)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
if not verify_csrf_token(session, csrf_token):
|
|
return RedirectResponse(url=f"/manage/users/{user_id}?error=csrf", status_code=303)
|
|
|
|
user = await User.get_or_none(id=user_id)
|
|
if not user:
|
|
return RedirectResponse(url="/manage/users?error=not_found", status_code=303)
|
|
|
|
user.username = username
|
|
user.email = email
|
|
user.storage_quota_bytes = int(storage_quota_gb * 1024 * 1024 * 1024)
|
|
user.plan_type = plan_type
|
|
user.is_active = is_active
|
|
user.is_superuser = is_superuser
|
|
|
|
if password and password.strip():
|
|
user.hashed_password = get_password_hash(password)
|
|
|
|
await user.save()
|
|
|
|
return RedirectResponse(url=f"/manage/users/{user_id}?success=1", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/delete")
|
|
async def user_delete(
|
|
request: Request,
|
|
user_id: int,
|
|
csrf_token: str = Form(...)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
if not verify_csrf_token(session, csrf_token):
|
|
return RedirectResponse(url=f"/manage/users/{user_id}?error=csrf", status_code=303)
|
|
|
|
user = await User.get_or_none(id=user_id)
|
|
if user:
|
|
await user.delete()
|
|
|
|
return RedirectResponse(url="/manage/users?deleted=1", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/toggle-active")
|
|
async def user_toggle_active(
|
|
request: Request,
|
|
user_id: int,
|
|
csrf_token: str = Form(...)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
if not verify_csrf_token(session, csrf_token):
|
|
return RedirectResponse(url="/manage/users?error=csrf", status_code=303)
|
|
|
|
user = await User.get_or_none(id=user_id)
|
|
if user:
|
|
user.is_active = not user.is_active
|
|
await user.save()
|
|
|
|
return RedirectResponse(url="/manage/users", status_code=303)
|
|
|
|
|
|
@router.get("/payments", response_class=HTMLResponse)
|
|
async def payments_list(
|
|
request: Request,
|
|
status: Optional[str] = None,
|
|
user_id: Optional[int] = None,
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(20, ge=5, le=100)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
query = Invoice.all()
|
|
|
|
if status:
|
|
query = query.filter(status=status)
|
|
if user_id:
|
|
query = query.filter(user_id=user_id)
|
|
|
|
total = await query.count()
|
|
total_pages = math.ceil(total / per_page) if total > 0 else 1
|
|
offset = (page - 1) * per_page
|
|
|
|
invoices = await query.order_by("-created_at").offset(offset).limit(per_page)
|
|
|
|
invoice_list = []
|
|
for inv in invoices:
|
|
user = await User.get_or_none(id=inv.user_id)
|
|
invoice_list.append({
|
|
"invoice": inv,
|
|
"user": user
|
|
})
|
|
|
|
total_revenue = await Invoice.filter(status="paid").all()
|
|
revenue_sum = sum(float(inv.total) for inv in total_revenue)
|
|
|
|
pending_count = await Invoice.filter(status="open").count()
|
|
pending_invoices = await Invoice.filter(status="open").all()
|
|
pending_sum = sum(float(inv.total) for inv in pending_invoices)
|
|
|
|
return templates.TemplateResponse("admin/payments/list.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"invoices": invoice_list,
|
|
"status_filter": status or "",
|
|
"user_id_filter": user_id,
|
|
"page": page,
|
|
"per_page": per_page,
|
|
"total": total,
|
|
"total_pages": total_pages,
|
|
"summary": {
|
|
"total_revenue": revenue_sum,
|
|
"pending_count": pending_count,
|
|
"pending_amount": pending_sum
|
|
}
|
|
})
|
|
|
|
|
|
@router.get("/payments/{invoice_id}", response_class=HTMLResponse)
|
|
async def payment_detail(request: Request, invoice_id: int):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
invoice = await Invoice.get_or_none(id=invoice_id)
|
|
if not invoice:
|
|
return RedirectResponse(url="/manage/payments?error=not_found", status_code=303)
|
|
|
|
user = await User.get_or_none(id=invoice.user_id)
|
|
line_items = await InvoiceLineItem.filter(invoice_id=invoice_id).all()
|
|
|
|
return templates.TemplateResponse("admin/payments/detail.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"invoice": invoice,
|
|
"user": user,
|
|
"line_items": line_items
|
|
})
|
|
|
|
|
|
@router.post("/payments/{invoice_id}/mark-paid")
|
|
async def payment_mark_paid(
|
|
request: Request,
|
|
invoice_id: int,
|
|
csrf_token: str = Form(...)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
if not verify_csrf_token(session, csrf_token):
|
|
return RedirectResponse(url=f"/manage/payments/{invoice_id}?error=csrf", status_code=303)
|
|
|
|
invoice = await Invoice.get_or_none(id=invoice_id)
|
|
if invoice:
|
|
invoice.status = "paid"
|
|
invoice.paid_at = datetime.utcnow()
|
|
await invoice.save()
|
|
|
|
return RedirectResponse(url=f"/manage/payments/{invoice_id}?success=1", status_code=303)
|
|
|
|
|
|
@router.get("/settings", response_class=HTMLResponse)
|
|
async def settings_page(request: Request):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
pricing_configs = await PricingConfig.all().order_by("config_key")
|
|
|
|
return templates.TemplateResponse("admin/settings.html", {
|
|
"request": request,
|
|
"session": session,
|
|
"csrf_token": generate_csrf_token(session),
|
|
"pricing_configs": pricing_configs
|
|
})
|
|
|
|
|
|
@router.post("/settings/pricing/{config_id}")
|
|
async def update_pricing(
|
|
request: Request,
|
|
config_id: int,
|
|
csrf_token: str = Form(...),
|
|
value: str = Form(...)
|
|
):
|
|
session = get_admin_session(request)
|
|
if not session:
|
|
return RedirectResponse(url="/manage/login", status_code=303)
|
|
|
|
if not verify_csrf_token(session, csrf_token):
|
|
return RedirectResponse(url="/manage/settings?error=csrf", status_code=303)
|
|
|
|
config = await PricingConfig.get_or_none(id=config_id)
|
|
if config:
|
|
config.config_value = Decimal(value)
|
|
config.updated_at = datetime.utcnow()
|
|
await config.save()
|
|
|
|
return RedirectResponse(url="/manage/settings?success=1", status_code=303)
|