diff --git a/python_imap_usage.md b/python_imap_usage.md new file mode 100644 index 0000000..04ac7b6 --- /dev/null +++ b/python_imap_usage.md @@ -0,0 +1,145 @@ +# Python SMTP usage + +Two examples of how to use the builtin IMAP library. + +## Reading messages + +``` +import os +import base64 +import env + +if not env.secret: + raise Exception("Environment issue.") + +import imaplib +import email +from email.header import decode_header + +imap_server = "mail.molodetz.nl" +email_user = "retoor@molodetz.nl" +email_password = env.secret + +try: + mail = imaplib.IMAP4_SSL(imap_server) + mail.login(email_user, email_password) + mail.select("inbox") + + while True: + status, messages = mail.search(None, "ALL") + email_ids = messages[0].split() + + for email_id in email_ids[-1:]: + status, msg_data = mail.fetch(email_id, "(RFC822)") + + for response_part in msg_data: + if isinstance(response_part, tuple): + msg = email.message_from_bytes(response_part[1]) + subject, encoding = decode_header(msg["Subject"])[0] + if isinstance(subject, bytes): + subject = subject.decode(encoding or "utf-8") + print(f"Subject: {subject}") + + from_ = msg.get("From") + print(f"From: {from_}") + + if from_ == "retoor@molodetz.nl": + mail.store(email_id, '+FLAGS', '\\Deleted') + mail.expunge() + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition")) + if content_type == "text/plain" and "attachment" not in content_disposition: + body = part.get_payload(decode=True).decode() + print(f"Body: {body}") + else: + body = msg.get_payload(decode=True).decode() + print(f"Body: {body}") + + mail.logout() + +except Exception as e: + print(f"Error: {e}") + +``` + +Sending email (requires my AsyncioPath made in other gist): + +``` +class SMTP(AsyncioPatch): + + def __init__(self, smtp_server, port, sender_email, password): + self.smtp_server = smtp_server + self.port = port + self.sender_email = sender_email + self.password = password + self.server = None + + def connect(self): + self.server = smtplib.SMTP(self.smtp_server, self.port) + print("Connected to:",self.smtp_server) + self.server.starttls() + self.server.login(self.sender_email, self.password) + print("Logged in as:", self.sender_email) + + def close(self): + self.server.quit() + self.server = None + print("Server disconnected") + + + def __enter__(self): + self.connect() + return self + + def __exit__(self, *args, **kwargs): + self.close() + + def send_mail(self, recipient_email, subject, body): + msg = MIMEMultipart() + msg["From"] = self.sender_email + msg["To"] = recipient_email + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + + try: + self.server.sendmail(self.sender_email, recipient_email, msg.as_string()) + print("Email sent successfully!") + + except Exception as e: + print(f"Error: {e}") + + finally: + pass + + +# Email configuration +server = SMTP(smtp_server = "mail.molodetz.nl", # Replace with your SMTP server + port = 587, # Usually 587 for TLS, 465 for SSL + sender_email = "retoor@molodetz.nl", + password = secret) + +with server: + for x in range(10): + server.send_mail( + recipient_email = "retoor@molodetz.nl", + subject="Email nr {}".format(x), + body="Email body nr{}".format(x) + ) + +async def test_async(): + async with server: + for x in range(10): + await server.asend_mail( + recipient_email = "retoor@molodetz.nl", + subject="Email nr {}".format(x), + body="Email body nr{}".format(x) + ) + +asyncio.run(test_async()) + + + +```