Add python_imap_usage.md
This commit is contained in:
parent
0dc8ec5c6f
commit
5bf3bc26f8
145
python_imap_usage.md
Normal file
145
python_imap_usage.md
Normal file
@ -0,0 +1,145 @@
|
|||||||
|
# Python SMTP usage
|
||||||
|
|
||||||
|
Two examples of how to use the builtin IMAP library.
|
||||||
|
|
||||||
|
## Reading messages
|
||||||
|
|
||||||
|
```
|
||||||
|
import os
|
||||||
|
import base64
|
||||||
|
import env
|
||||||
|
|
||||||
|
if not env.secret:
|
||||||
|
raise Exception("Environment issue.")
|
||||||
|
|
||||||
|
import imaplib
|
||||||
|
import email
|
||||||
|
from email.header import decode_header
|
||||||
|
|
||||||
|
imap_server = "mail.molodetz.nl"
|
||||||
|
email_user = "retoor@molodetz.nl"
|
||||||
|
email_password = env.secret
|
||||||
|
|
||||||
|
try:
|
||||||
|
mail = imaplib.IMAP4_SSL(imap_server)
|
||||||
|
mail.login(email_user, email_password)
|
||||||
|
mail.select("inbox")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
status, messages = mail.search(None, "ALL")
|
||||||
|
email_ids = messages[0].split()
|
||||||
|
|
||||||
|
for email_id in email_ids[-1:]:
|
||||||
|
status, msg_data = mail.fetch(email_id, "(RFC822)")
|
||||||
|
|
||||||
|
for response_part in msg_data:
|
||||||
|
if isinstance(response_part, tuple):
|
||||||
|
msg = email.message_from_bytes(response_part[1])
|
||||||
|
subject, encoding = decode_header(msg["Subject"])[0]
|
||||||
|
if isinstance(subject, bytes):
|
||||||
|
subject = subject.decode(encoding or "utf-8")
|
||||||
|
print(f"Subject: {subject}")
|
||||||
|
|
||||||
|
from_ = msg.get("From")
|
||||||
|
print(f"From: {from_}")
|
||||||
|
|
||||||
|
if from_ == "retoor@molodetz.nl":
|
||||||
|
mail.store(email_id, '+FLAGS', '\\Deleted')
|
||||||
|
mail.expunge()
|
||||||
|
|
||||||
|
if msg.is_multipart():
|
||||||
|
for part in msg.walk():
|
||||||
|
content_type = part.get_content_type()
|
||||||
|
content_disposition = str(part.get("Content-Disposition"))
|
||||||
|
if content_type == "text/plain" and "attachment" not in content_disposition:
|
||||||
|
body = part.get_payload(decode=True).decode()
|
||||||
|
print(f"Body: {body}")
|
||||||
|
else:
|
||||||
|
body = msg.get_payload(decode=True).decode()
|
||||||
|
print(f"Body: {body}")
|
||||||
|
|
||||||
|
mail.logout()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Sending email (requires my AsyncioPath made in other gist):
|
||||||
|
|
||||||
|
```
|
||||||
|
class SMTP(AsyncioPatch):
|
||||||
|
|
||||||
|
def __init__(self, smtp_server, port, sender_email, password):
|
||||||
|
self.smtp_server = smtp_server
|
||||||
|
self.port = port
|
||||||
|
self.sender_email = sender_email
|
||||||
|
self.password = password
|
||||||
|
self.server = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.server = smtplib.SMTP(self.smtp_server, self.port)
|
||||||
|
print("Connected to:",self.smtp_server)
|
||||||
|
self.server.starttls()
|
||||||
|
self.server.login(self.sender_email, self.password)
|
||||||
|
print("Logged in as:", self.sender_email)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.server.quit()
|
||||||
|
self.server = None
|
||||||
|
print("Server disconnected")
|
||||||
|
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.connect()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args, **kwargs):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def send_mail(self, recipient_email, subject, body):
|
||||||
|
msg = MIMEMultipart()
|
||||||
|
msg["From"] = self.sender_email
|
||||||
|
msg["To"] = recipient_email
|
||||||
|
msg["Subject"] = subject
|
||||||
|
msg.attach(MIMEText(body, "plain"))
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.server.sendmail(self.sender_email, recipient_email, msg.as_string())
|
||||||
|
print("Email sent successfully!")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Email configuration
|
||||||
|
server = SMTP(smtp_server = "mail.molodetz.nl", # Replace with your SMTP server
|
||||||
|
port = 587, # Usually 587 for TLS, 465 for SSL
|
||||||
|
sender_email = "retoor@molodetz.nl",
|
||||||
|
password = secret)
|
||||||
|
|
||||||
|
with server:
|
||||||
|
for x in range(10):
|
||||||
|
server.send_mail(
|
||||||
|
recipient_email = "retoor@molodetz.nl",
|
||||||
|
subject="Email nr {}".format(x),
|
||||||
|
body="Email body nr{}".format(x)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def test_async():
|
||||||
|
async with server:
|
||||||
|
for x in range(10):
|
||||||
|
await server.asend_mail(
|
||||||
|
recipient_email = "retoor@molodetz.nl",
|
||||||
|
subject="Email nr {}".format(x),
|
||||||
|
body="Email body nr{}".format(x)
|
||||||
|
)
|
||||||
|
|
||||||
|
asyncio.run(test_async())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
```
|
Loading…
Reference in New Issue
Block a user