from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.responses import JSONResponse
from typing import List, Optional
from datetime import datetime, date
from decimal import Decimal
import calendar
from ..auth import get_current_user
from ..models import User
from ..billing.models import (
Invoice, InvoiceLineItem, UserSubscription, PricingConfig,
PaymentMethod, UsageAggregate, SubscriptionPlan
)
from ..billing.usage_tracker import UsageTracker
from ..billing.invoice_generator import InvoiceGenerator
from ..billing.stripe_client import StripeClient
from pydantic import BaseModel
router = APIRouter(
prefix="/api/billing",
tags=["billing"]
)
class UsageResponse(BaseModel):
storage_gb_avg: float
storage_gb_peak: float
bandwidth_up_gb: float
bandwidth_down_gb: float
total_bandwidth_gb: float
period: str
class InvoiceResponse(BaseModel):
id: int
invoice_number: str
period_start: date
period_end: date
subtotal: float
tax: float
total: float
status: str
due_date: Optional[date]
paid_at: Optional[datetime]
line_items: List[dict]
class SubscriptionResponse(BaseModel):
id: int
billing_type: str
plan_name: Optional[str]
status: str
current_period_start: Optional[datetime]
current_period_end: Optional[datetime]
@router.get("/usage/current")
async def get_current_usage(current_user: User = Depends(get_current_user)):
storage_bytes = await UsageTracker.get_current_storage(current_user)
today = date.today()
usage_today = await UsageAggregate.get_or_none(user=current_user, date=today)
if usage_today:
return {
"storage_gb": round(storage_bytes / (1024**3), 4),
"bandwidth_down_gb_today": round(usage_today.bandwidth_down_bytes / (1024**3), 4),
"bandwidth_up_gb_today": round(usage_today.bandwidth_up_bytes / (1024**3), 4),
"as_of": today.isoformat()
}
return {
"storage_gb": round(storage_bytes / (1024**3), 4),
"bandwidth_down_gb_today": 0,
"bandwidth_up_gb_today": 0,
"as_of": today.isoformat()
}
@router.get("/usage/monthly")
async def get_monthly_usage(
year: Optional[int] = None,
month: Optional[int] = None,
current_user: User = Depends(get_current_user)
) -> UsageResponse:
if year is None or month is None:
now = datetime.now()
year = now.year
month = now.month
usage = await UsageTracker.get_monthly_usage(current_user, year, month)
return UsageResponse(
**usage,
period=f"{year}-{month:02d}"
)
@router.get("/invoices")
async def list_invoices(
limit: int = 50,
offset: int = 0,
current_user: User = Depends(get_current_user)
) -> List[InvoiceResponse]:
invoices = await Invoice.filter(user=current_user).order_by("-created_at").offset(offset).limit(limit).all()
result = []
for invoice in invoices:
line_items = await invoice.line_items.all()
result.append(InvoiceResponse(
id=invoice.id,
invoice_number=invoice.invoice_number,
period_start=invoice.period_start,
period_end=invoice.period_end,
subtotal=float(invoice.subtotal),
tax=float(invoice.tax),
total=float(invoice.total),
status=invoice.status,
due_date=invoice.due_date,
paid_at=invoice.paid_at,
line_items=[
{
"description": item.description,
"quantity": float(item.quantity),
"unit_price": float(item.unit_price),
"amount": float(item.amount),
"type": item.item_type
}
for item in line_items
]
))
return result
@router.get("/invoices/{invoice_id}")
async def get_invoice(
invoice_id: int,
current_user: User = Depends(get_current_user)
) -> InvoiceResponse:
invoice = await Invoice.get_or_none(id=invoice_id, user=current_user)
if not invoice:
raise HTTPException(status_code=404, detail="Invoice not found")
line_items = await invoice.line_items.all()
return InvoiceResponse(
id=invoice.id,
invoice_number=invoice.invoice_number,
period_start=invoice.period_start,
period_end=invoice.period_end,
subtotal=float(invoice.subtotal),
tax=float(invoice.tax),
total=float(invoice.total),
status=invoice.status,
due_date=invoice.due_date,
paid_at=invoice.paid_at,
line_items=[
{
"description": item.description,
"quantity": float(item.quantity),
"unit_price": float(item.unit_price),
"amount": float(item.amount),
"type": item.item_type
}
for item in line_items
]
)
@router.get("/subscription")
async def get_subscription(current_user: User = Depends(get_current_user)) -> SubscriptionResponse:
subscription = await UserSubscription.get_or_none(user=current_user)
if not subscription:
subscription = await UserSubscription.create(
user=current_user,
billing_type="pay_as_you_go",
status="active"
)
plan_name = None
if subscription.plan:
plan = await subscription.plan
plan_name = plan.display_name
return SubscriptionResponse(
id=subscription.id,
billing_type=subscription.billing_type,
plan_name=plan_name,
status=subscription.status,
current_period_start=subscription.current_period_start,
current_period_end=subscription.current_period_end
)
@router.post("/payment-methods/setup-intent")
async def create_setup_intent(current_user: User = Depends(get_current_user)):
subscription = await UserSubscription.get_or_none(user=current_user)
if not subscription or not subscription.stripe_customer_id:
customer_id = await StripeClient.create_customer(
email=current_user.email,
name=current_user.username,
metadata={"user_id": str(current_user.id)}
)
if not subscription:
subscription = await UserSubscription.create(
user=current_user,
billing_type="pay_as_you_go",
stripe_customer_id=customer_id,
status="active"
)
else:
subscription.stripe_customer_id = customer_id
await subscription.save()
import stripe
setup_intent = stripe.SetupIntent.create(
customer=subscription.stripe_customer_id,
payment_method_types=["card"]
)
return {
"client_secret": setup_intent.client_secret,
"customer_id": subscription.stripe_customer_id
}
@router.get("/payment-methods")
async def list_payment_methods(current_user: User = Depends(get_current_user)):
methods = await PaymentMethod.filter(user=current_user).all()
return [
{
"id": m.id,
"type": m.type,
"last4": m.last4,
"brand": m.brand,
"exp_month": m.exp_month,
"exp_year": m.exp_year,
"is_default": m.is_default
}
for m in methods
]
@router.post("/webhooks/stripe")
async def stripe_webhook(request: Request):
import stripe
from ..settings import settings
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid payload")
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
if event["type"] == "invoice.payment_succeeded":
invoice_data = event["data"]["object"]
rbox_invoice_id = invoice_data.get("metadata", {}).get("rbox_invoice_id")
if rbox_invoice_id:
invoice = await Invoice.get_or_none(id=int(rbox_invoice_id))
if invoice:
await InvoiceGenerator.mark_invoice_paid(invoice)
elif event["type"] == "invoice.payment_failed":
pass
elif event["type"] == "payment_method.attached":
payment_method = event["data"]["object"]
customer_id = payment_method["customer"]
subscription = await UserSubscription.get_or_none(stripe_customer_id=customer_id)
if subscription:
await PaymentMethod.create(
user=subscription.user,
stripe_payment_method_id=payment_method["id"],
type=payment_method["type"],
last4=payment_method.get("card", {}).get("last4"),
brand=payment_method.get("card", {}).get("brand"),
exp_month=payment_method.get("card", {}).get("exp_month"),
exp_year=payment_method.get("card", {}).get("exp_year"),
is_default=True
)
return JSONResponse(content={"status": "success"})
@router.get("/pricing")
async def get_pricing():
configs = await PricingConfig.all()
return {
config.config_key: {
"value": float(config.config_value),
"description": config.description,
"unit": config.unit
}
for config in configs
}
@router.get("/plans")
async def list_plans():
plans = await SubscriptionPlan.filter(is_active=True).all()
return [
{
"id": plan.id,
"name": plan.name,
"display_name": plan.display_name,
"description": plan.description,
"storage_gb": plan.storage_gb,
"bandwidth_gb": plan.bandwidth_gb,
"price_monthly": float(plan.price_monthly),
"price_yearly": float(plan.price_yearly) if plan.price_yearly else None
}
for plan in plans
]