diff --git a/src/snek/system/notification.py b/src/snek/system/notification.py index 7f4274d..451e43a 100644 --- a/src/snek/system/notification.py +++ b/src/snek/system/notification.py @@ -1,3 +1,4 @@ +import random import time import base64 import uuid @@ -138,61 +139,61 @@ class Notifications: algorithm="ES256", ) - def create_encrypted_payload(self, auth: str, p256dh: str, payload: str): - # 1. Generate private key - private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) + def create_encrypted_payload(self, endpoint: str, auth: str, p256dh: str, payload: str): + message_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) - # 2. Get public key - public_key = private_key.public_key() - - salt = os.urandom(16) - - subscription_pub_key_bytes = base64.urlsafe_b64decode(p256dh + "====") - subscription_public_key = ec.EllipticCurvePublicKey.from_encoded_point( - ec.SECP256R1(), subscription_pub_key_bytes - ) - shared_secret = private_key.exchange(ec.ECDH(), subscription_public_key) - - auth_dec = base64.urlsafe_b64decode(auth + "==") - auth_enc = b"Content-Encoding: auth\x00" - prk = hkdf(shared_secret, auth_dec, auth_enc, 32) - - # 5. Build context - key_label = b"P-256\x00" - subscription_len = len(subscription_pub_key_bytes).to_bytes(2, "big") - local_pub_bytes = public_key.public_bytes( + message_public_key_bytes = message_private_key.public_key().public_bytes( encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint, ) - local_len = len(local_pub_bytes).to_bytes(2, "big") - context = ( - key_label - + subscription_len - + subscription_pub_key_bytes - + local_len - + local_pub_bytes + + salt = os.urandom(16) + + user_key_bytes = base64.urlsafe_b64decode(p256dh + "==") + shared_secret = message_private_key.exchange( + ec.ECDH(), + ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), user_key_bytes + ), ) - # 6. Generate nonce and CEK - nonce_enc = b"Content-Encoding: nonce\x00" - nonce_info = nonce_enc + context - cek_enc = b"Content-Encoding: aesgcm\x00" - cek_info = cek_enc + context + encryption_key = hkdf( + shared_secret, + base64.urlsafe_b64decode(auth + "=="), + b"Content-Encoding: auth\x00", + 32, + ) - nonce = hkdf(prk, salt, nonce_info, 12) - content_encryption_key = hkdf(prk, salt, cek_info, 16) + context = ( + b"P-256\x00" + + len(user_key_bytes).to_bytes(2, "big") + + user_key_bytes + + len(message_public_key_bytes).to_bytes(2, "big") + + message_public_key_bytes + ) - # 7. Encrypt payload with AES-GCM - aesgcm = AESGCM(content_encryption_key) - padding_length = 0 # adjust if needed + nonce = hkdf(encryption_key, salt, b"Content-Encoding: nonce\x00" + context, 12) + content_encryption_key = hkdf( + encryption_key, salt, b"Content-Encoding: aesgcm\x00" + context, 16 + ) + + padding_length = random.randint(0, 16) padding = padding_length.to_bytes(2, "big") + b"\x00" * padding_length - combined = padding + payload.encode("utf-8") - encrypted = aesgcm.encrypt(nonce, combined, None) + + data = AESGCM(content_encryption_key).encrypt( + nonce, padding + payload.encode("utf-8"), None + ) return { - 'payload': encrypted, - 'salt': salt, - 'public_key': local_pub_bytes + "headers": { + "Authorization": f"WebPush {self.create_notification_authorization(endpoint)}", + "Crypto-Key": f"dh={_browser_base64(message_public_key_bytes)}; p256ecdsa={self.public_key_base64}", + "Encryption": f"salt={_browser_base64(salt)}", + "Content-Encoding": "aesgcm", + "Content-Length": str(len(data)), + "Content-Type": "application/octet-stream", + }, + "data": data, } diff --git a/src/snek/view/push.py b/src/snek/view/push.py index bf1bca3..643e143 100644 --- a/src/snek/view/push.py +++ b/src/snek/view/push.py @@ -77,8 +77,6 @@ class PushView(BaseFormView): print(body) notifications = get_notifications() - cert = notifications.public_key_base64 - test_payload = { "title": "Hey retoor", "message": "Guess what? ;P", @@ -86,30 +84,21 @@ class PushView(BaseFormView): "url": "/web.html", } - encryped = notifications.create_encrypted_payload( + notification_info = notifications.create_encrypted_payload( + body['endpoint'], body["keys"]["auth"], body["keys"]["p256dh"], json.dumps(test_payload), ) - payload = encryped["payload"] - salt = encryped["salt"] - public_key = encryped["public_key"] - headers = { + **notification_info["headers"], "TTL": "60", - "Authorization": f"WebPush {notifications.create_notification_authorization(body['endpoint'])}", - "Crypto-Key": f"dh={base64.urlsafe_b64encode(public_key).decode('utf-8').rstrip('=')}; p256ecdsa={cert}", - "Encryption": f"salt={base64.urlsafe_b64encode(salt).decode('utf-8').rstrip('=')}", - - "Content-Encoding": "aesgcm", - "Content-Length": str(len(payload)), - "Content-Type": "application/octet-stream", } print(headers) - post_notification = requests.post(body["endpoint"], headers=headers, data=payload) + post_notification = requests.post(body["endpoint"], headers=headers, data=notification_info["data"]) print(post_notification.status_code) print(post_notification.text)