Cleaned up code a bit

This commit is contained in:
BordedDev 2025-04-13 23:14:01 +02:00
parent d966c9529b
commit 326c549670
No known key found for this signature in database
GPG Key ID: C5F495EAE56673BF
2 changed files with 49 additions and 59 deletions

View File

@ -1,3 +1,4 @@
import random
import time import time
import base64 import base64
import uuid import uuid
@ -138,61 +139,61 @@ class Notifications:
algorithm="ES256", algorithm="ES256",
) )
def create_encrypted_payload(self, auth: str, p256dh: str, payload: str): def create_encrypted_payload(self, endpoint: str, auth: str, p256dh: str, payload: str):
# 1. Generate private key message_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
# 2. Get public key message_public_key_bytes = message_private_key.public_key().public_bytes(
public_key = private_key.public_key()
salt = os.urandom(16)
subscription_pub_key_bytes = base64.urlsafe_b64decode(p256dh + "====")
subscription_public_key = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), subscription_pub_key_bytes
)
shared_secret = private_key.exchange(ec.ECDH(), subscription_public_key)
auth_dec = base64.urlsafe_b64decode(auth + "==")
auth_enc = b"Content-Encoding: auth\x00"
prk = hkdf(shared_secret, auth_dec, auth_enc, 32)
# 5. Build context
key_label = b"P-256\x00"
subscription_len = len(subscription_pub_key_bytes).to_bytes(2, "big")
local_pub_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962, encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint, format=serialization.PublicFormat.UncompressedPoint,
) )
local_len = len(local_pub_bytes).to_bytes(2, "big")
context = ( salt = os.urandom(16)
key_label
+ subscription_len user_key_bytes = base64.urlsafe_b64decode(p256dh + "==")
+ subscription_pub_key_bytes shared_secret = message_private_key.exchange(
+ local_len ec.ECDH(),
+ local_pub_bytes ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), user_key_bytes
),
) )
# 6. Generate nonce and CEK encryption_key = hkdf(
nonce_enc = b"Content-Encoding: nonce\x00" shared_secret,
nonce_info = nonce_enc + context base64.urlsafe_b64decode(auth + "=="),
cek_enc = b"Content-Encoding: aesgcm\x00" b"Content-Encoding: auth\x00",
cek_info = cek_enc + context 32,
)
nonce = hkdf(prk, salt, nonce_info, 12) context = (
content_encryption_key = hkdf(prk, salt, cek_info, 16) b"P-256\x00"
+ len(user_key_bytes).to_bytes(2, "big")
+ user_key_bytes
+ len(message_public_key_bytes).to_bytes(2, "big")
+ message_public_key_bytes
)
# 7. Encrypt payload with AES-GCM nonce = hkdf(encryption_key, salt, b"Content-Encoding: nonce\x00" + context, 12)
aesgcm = AESGCM(content_encryption_key) content_encryption_key = hkdf(
padding_length = 0 # adjust if needed encryption_key, salt, b"Content-Encoding: aesgcm\x00" + context, 16
)
padding_length = random.randint(0, 16)
padding = padding_length.to_bytes(2, "big") + b"\x00" * padding_length padding = padding_length.to_bytes(2, "big") + b"\x00" * padding_length
combined = padding + payload.encode("utf-8")
encrypted = aesgcm.encrypt(nonce, combined, None) data = AESGCM(content_encryption_key).encrypt(
nonce, padding + payload.encode("utf-8"), None
)
return { return {
'payload': encrypted, "headers": {
'salt': salt, "Authorization": f"WebPush {self.create_notification_authorization(endpoint)}",
'public_key': local_pub_bytes "Crypto-Key": f"dh={_browser_base64(message_public_key_bytes)}; p256ecdsa={self.public_key_base64}",
"Encryption": f"salt={_browser_base64(salt)}",
"Content-Encoding": "aesgcm",
"Content-Length": str(len(data)),
"Content-Type": "application/octet-stream",
},
"data": data,
} }

View File

@ -77,8 +77,6 @@ class PushView(BaseFormView):
print(body) print(body)
notifications = get_notifications() notifications = get_notifications()
cert = notifications.public_key_base64
test_payload = { test_payload = {
"title": "Hey retoor", "title": "Hey retoor",
"message": "Guess what? ;P", "message": "Guess what? ;P",
@ -86,30 +84,21 @@ class PushView(BaseFormView):
"url": "/web.html", "url": "/web.html",
} }
encryped = notifications.create_encrypted_payload( notification_info = notifications.create_encrypted_payload(
body['endpoint'],
body["keys"]["auth"], body["keys"]["auth"],
body["keys"]["p256dh"], body["keys"]["p256dh"],
json.dumps(test_payload), json.dumps(test_payload),
) )
payload = encryped["payload"]
salt = encryped["salt"]
public_key = encryped["public_key"]
headers = { headers = {
**notification_info["headers"],
"TTL": "60", "TTL": "60",
"Authorization": f"WebPush {notifications.create_notification_authorization(body['endpoint'])}",
"Crypto-Key": f"dh={base64.urlsafe_b64encode(public_key).decode('utf-8').rstrip('=')}; p256ecdsa={cert}",
"Encryption": f"salt={base64.urlsafe_b64encode(salt).decode('utf-8').rstrip('=')}",
"Content-Encoding": "aesgcm",
"Content-Length": str(len(payload)),
"Content-Type": "application/octet-stream",
} }
print(headers) print(headers)
post_notification = requests.post(body["endpoint"], headers=headers, data=payload) post_notification = requests.post(body["endpoint"], headers=headers, data=notification_info["data"])
print(post_notification.status_code) print(post_notification.status_code)
print(post_notification.text) print(post_notification.text)