63 lines
1.8 KiB
Python
Raw Normal View History

2025-11-10 15:46:40 +01:00
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, date, timedelta
from .usage_tracker import UsageTracker
from .invoice_generator import InvoiceGenerator
from ..models import User
scheduler = AsyncIOScheduler()
2025-11-13 23:22:05 +01:00
2025-11-10 15:46:40 +01:00
async def aggregate_daily_usage_for_all_users():
users = await User.filter(is_active=True).all()
yesterday = date.today() - timedelta(days=1)
for user in users:
try:
await UsageTracker.aggregate_daily_usage(user, yesterday)
except Exception as e:
print(f"Failed to aggregate usage for user {user.id}: {e}")
2025-11-13 23:22:05 +01:00
2025-11-10 15:46:40 +01:00
async def generate_monthly_invoices():
now = datetime.now()
last_month = now.month - 1 if now.month > 1 else 12
year = now.year if now.month > 1 else now.year - 1
users = await User.filter(is_active=True).all()
for user in users:
try:
2025-11-13 23:22:05 +01:00
invoice = await InvoiceGenerator.generate_monthly_invoice(
user, year, last_month
)
2025-11-10 15:46:40 +01:00
if invoice:
await InvoiceGenerator.finalize_invoice(invoice)
except Exception as e:
print(f"Failed to generate invoice for user {user.id}: {e}")
2025-11-13 23:22:05 +01:00
2025-11-10 15:46:40 +01:00
def start_scheduler():
scheduler.add_job(
aggregate_daily_usage_for_all_users,
CronTrigger(hour=1, minute=0),
id="aggregate_daily_usage",
name="Aggregate daily usage for all users",
2025-11-13 23:22:05 +01:00
replace_existing=True,
2025-11-10 15:46:40 +01:00
)
scheduler.add_job(
generate_monthly_invoices,
CronTrigger(day=1, hour=2, minute=0),
id="generate_monthly_invoices",
name="Generate monthly invoices",
2025-11-13 23:22:05 +01:00
replace_existing=True,
2025-11-10 15:46:40 +01:00
)
scheduler.start()
2025-11-13 23:22:05 +01:00
2025-11-10 15:46:40 +01:00
def stop_scheduler():
scheduler.shutdown()