import asyncio
import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Optional
from .settings import settings
class EmailTask:
def __init__(
self,
to_email: str,
subject: str,
body: str,
html: Optional[str] = None,
**kwargs,
):
self.to_email = to_email
self.subject = subject
self.body = body
self.html = html
self.kwargs = kwargs
class EmailService:
def __init__(self):
self.queue = asyncio.Queue()
self.worker_task: Optional[asyncio.Task] = None
self.running = False
async def start(self):
"""Start the email worker"""
if self.running:
return
self.running = True
self.worker_task = asyncio.create_task(self._worker())
async def stop(self):
"""Stop the email worker"""
if not self.running:
return
self.running = False
if self.worker_task:
self.worker_task.cancel()
try:
await self.worker_task
except asyncio.CancelledError:
pass
async def send_email(
self,
to_email: str,
subject: str,
body: str,
html: Optional[str] = None,
**kwargs,
):
"""Queue an email for sending"""
task = EmailTask(to_email, subject, body, html, **kwargs)
await self.queue.put(task)
async def _worker(self):
"""Email worker coroutine"""
while self.running:
try:
# Wait for a task with timeout to allow checking running flag
task = await asyncio.wait_for(self.queue.get(), timeout=1.0)
await self._send_email_task(task)
self.queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
# Log error, but continue processing
print(f"Email worker error: {e}")
continue
async def _send_email_task(self, task: EmailTask):
"""Send a single email task"""
if (
not settings.SMTP_HOST
or not settings.SMTP_USERNAME
or not settings.SMTP_PASSWORD
):
print("SMTP not configured, skipping email send")
return
msg = MIMEMultipart("alternative")
msg["From"] = settings.SMTP_SENDER_EMAIL
msg["To"] = task.to_email
msg["Subject"] = task.subject
# Add text part
text_part = MIMEText(task.body, "plain")
msg.attach(text_part)
# Add HTML part if provided
if task.html:
html_part = MIMEText(task.html, "html")
msg.attach(html_part)
try:
# Use implicit TLS for port 465 or when SMTP_USE_TLS is True
if settings.SMTP_PORT == 465 or settings.SMTP_USE_TLS:
async with aiosmtplib.SMTP(
hostname=settings.SMTP_HOST,
port=settings.SMTP_PORT,
username=settings.SMTP_USERNAME,
password=settings.SMTP_PASSWORD,
use_tls=True,
) as smtp:
await smtp.send_message(msg)
print(f"Email sent to {task.to_email}")
else:
# Use STARTTLS for other ports
async with aiosmtplib.SMTP(
hostname=settings.SMTP_HOST,
port=settings.SMTP_PORT,
) as smtp:
try:
await smtp.starttls()
except Exception as tls_error:
if (
"already using" in str(tls_error).lower()
or "tls" in str(tls_error).lower()
):
# Connection is already using TLS, proceed without starttls
pass
else:
raise
await smtp.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
await smtp.send_message(msg)
print(f"Email sent to {task.to_email}")
except Exception as e:
print(f"Failed to send email to {task.to_email}: {e}")
raise # Re-raise to let caller handle
# Global email service instance
email_service = EmailService()
# Convenience functions
async def send_email(
to_email: str, subject: str, body: str, html: Optional[str] = None, **kwargs
):
"""Send an email asynchronously"""
await email_service.send_email(to_email, subject, body, html, **kwargs)
def queue_email(
to_email: str, subject: str, body: str, html: Optional[str] = None, **kwargs
):
"""Queue an email for sending (fire and forget)"""
asyncio.create_task(
email_service.send_email(to_email, subject, body, html, **kwargs)
)