Cleaned up code a bit

This commit is contained in:
BordedDev 2025-04-13 03:19:28 +02:00
parent 4350714534
commit d966c9529b
No known key found for this signature in database
GPG Key ID: C5F495EAE56673BF
5 changed files with 131 additions and 186 deletions

View File

@ -1,62 +1,49 @@
// this.onpush = (event) => { export const registerServiceWorker = async (silent = false) => {
// console.log(event.data); try {
// // From here we can write the data to IndexedDB, send it to any open const serviceWorkerRegistration = await navigator.serviceWorker
// // windows, display a notification, etc. .register("/service-worker.js")
// };
function arrayBufferToBase64(buffer) { await serviceWorkerRegistration.update()
return btoa(String.fromCharCode(...new Uint8Array(buffer)))
const keyResponse = await fetch('/push.json')
const keyData = await keyResponse.json()
const publicKey = Uint8Array.from(atob(keyData.publicKey), c => c.charCodeAt(0))
const pushSubscription = await serviceWorkerRegistration.pushManager.subscribe({
userVisibleOnly: true, applicationServerKey: publicKey,
})
const subscriptionObject = {
...pushSubscription.toJSON(), encoding: PushManager.supportedContentEncodings,
};
console.log(pushSubscription.endpoint, pushSubscription, pushSubscription.toJSON(), subscriptionObject);
const response = await fetch('/push.json', {
method: 'POST', headers: {
'Content-Type': 'application/json',
}, body: JSON.stringify(subscriptionObject),
})
if (!response.ok) {
throw new Error('Bad status code from server.');
}
const responseData = await response.json();
console.log('Registration response', responseData);
} catch (error) {
console.error("Error registering service worker:", error);
if (!silent) {
alert("Registering push notifications failed. Please check your browser settings and try again.\n\n" + error);
}
}
} }
const keyResponse = await fetch('/push.json')
const keyData = await keyResponse.json()
console.log("Key data", keyData); window.registerNotificationsServiceWorker = () => {
const publicKey = Uint8Array.from(atob(keyData.publicKey), c => c.charCodeAt(0))
export const registerServiceWorker = async () => {
navigator.serviceWorker
.register("/service-worker.js")
.then((serviceWorkerRegistration) => {
console.log(serviceWorkerRegistration);
serviceWorkerRegistration.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: publicKey,
}).then(
(pushSubscription) => {
const subscriptionObject = {
...pushSubscription.toJSON(),
encoding: PushManager.supportedContentEncodings,
/* other app-specific data, such as user identity */
};
console.log(pushSubscription.endpoint, pushSubscription, pushSubscription.toJSON(), subscriptionObject);
fetch('/push.json', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(subscriptionObject),
}).then((response) => {
if (!response.ok) {
throw new Error('Bad status code from server.');
}
return response.json();
}).then((responseData) => {
console.log(responseData);
});
},
(error) => {
console.error(error);
},
);
});
}
window.registerServiceWorker = () => {
return Notification.requestPermission().then((permission) => { return Notification.requestPermission().then((permission) => {
if (permission === "granted") { if (permission === "granted") {
console.log("Permission was granted");
return registerServiceWorker(); return registerServiceWorker();
} else if (permission === "denied") { } else if (permission === "denied") {
console.log("Permission was denied"); console.log("Permission was denied");
@ -65,4 +52,4 @@ window.registerServiceWorker = () => {
} }
}); });
}; };
registerServiceWorker().catch(console.error); registerServiceWorker(true).catch(console.error);

View File

@ -1,83 +1,66 @@
async function requestNotificationPermission() { self.addEventListener("install", (event) => {
const permission = await Notification.requestPermission(); console.log("Service worker installing...");
return permission === "granted"; event.waitUntil(
} caches.open("snek-cache").then((cache) => {
return cache.addAll([]);
})
);
})
// Subscribe to Push Notifications self.addEventListener("activate", (event) => {
async function subscribeUser() { event.waitUntil(self.registration?.navigationPreload.enable());
const registration = });
await navigator.serviceWorker.register("/service-worker.js");
const subscription = await registration.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: urlBase64ToUint8Array(PUBLIC_VAPID_KEY),
});
// Send subscription to your backend self.addEventListener("push", (event) => {
await fetch("/subscribe", { if (!self.Notification || self.Notification.permission !== "granted") {
method: "POST", console.log("Notification permission not granted");
body: JSON.stringify(subscription), return;
headers: { }
"Content-Type": "application/json",
},
});
}
// Service Worker (service-worker.js) const data = event.data?.json() ?? {};
// self.addEventListener("push", (event) => { console.log("Received a push message", event, data);
// const data = event.data.json();
// self.registration.showNotification(data.title, {
// body: data.message,
// icon: data.icon,
// });
// });
const title = data.title || "Something Has Happened";
const message =
data.message || "Here's something you might want to check out.";
const icon = data.icon || "/image/snek512.png";
self.addEventListener("push", (event) => { const notificationSettings = data.notificationSettings || {};
if (!self.Notification || self.Notification.permission !== "granted") {
console.log("Notification permission not granted");
return;
}
const data = event.data?.json() ?? {}; console.log("Showing message", title, message, icon);
console.log("Received a push message", event, data);
const title = data.title || "Something Has Happened"; const reg = self.registration.showNotification(title, {
const message = body: message,
data.message || "Here's something you might want to check out."; tag: "message-received",
const icon = data.icon || "images/new-notification.png"; icon,
console.log("showing message", title, message, icon); badge: icon,
...notificationSettings,
const reg = self.registration.showNotification(title, { data,
body: message, }).then(e => console.log("Showing notification", e)).catch(console.error);
tag: "simple-push-demo-notification",
icon,
}).then(e => console.log("success", e)).catch(console.error);
event.waitUntil(reg);
event.waitUntil(reg);
}); });
self.addEventListener("notificationclick", (event) => { self.addEventListener("notificationclick", (event) => {
console.log("Notification click Received.", event); console.log("Notification click Received.", event);
event.notification.close(); event.notification.close();
event.waitUntil(clients.openWindow( event.waitUntil(clients.openWindow(`${event.notification.data.url || event.notification.data.link || `/web.html`}`));
"https://snek.molodetz.nl",));
}); });
self.addEventListener("notificationclose", (event) => { self.addEventListener("notificationclose", (event) => {
console.log("Notification closed", event);
}) })
self.addEventListener("fetch", (event) => { self.addEventListener("fetch", (event) => {
// console.log("Fetch event for ", event.request.url); // console.log("Fetch event for ", event.request.url);
event.respondWith( event.respondWith(
caches.match(event.request).then((response) => { caches.match(event.request).then((response) => {
if (response) { if (response) {
// console.log("Found response in cache: ", response); // console.log("Found response in cache: ", response);
return response; return response;
} }
// console.log("No response found in cache. About to fetch from network..."); // console.log("No response found in cache. About to fetch from network...");
return fetch(event.request); return fetch(event.request);
}) })
); );
}) })

View File

@ -2,6 +2,7 @@ import time
import base64 import base64
import uuid import uuid
from functools import cache from functools import cache
from pathlib import Path
import jwt import jwt
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
@ -14,66 +15,55 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.hashes import SHA256 from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.kdf.hkdf import HKDF
PRIVATE_KEY_FILE = "./notification-private.pem" # The only reason to persist the keys is to be able to use them in the web push
PRIVATE_KEY_PKCS8_FILE = "./notification-private.pkcs8.pem"
PUBLIC_KEY_FILE = "./notification-public.pem" PRIVATE_KEY_FILE = Path("./notification-private.pem")
PRIVATE_KEY_PKCS8_FILE = Path("./notification-private.pkcs8.pem")
PUBLIC_KEY_FILE = Path("./notification-public.pem")
def generate_private_key(): def generate_private_key():
if not os.path.isfile(PRIVATE_KEY_FILE): if not PRIVATE_KEY_FILE.exists():
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
# Serialize the private key to PEM format
pem = private_key.private_bytes( pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
# Write the private key to a file PRIVATE_KEY_FILE.write_bytes(pem)
with open(PRIVATE_KEY_FILE, "wb") as pem_out:
pem_out.write(pem)
def generate_pcks8_private_key(): def generate_pcks8_private_key():
# openssl pkcs8 -topk8 -nocrypt -in private.pem -out private.pkcs8.pem if not PRIVATE_KEY_PKCS8_FILE.exists():
if not os.path.isfile(PRIVATE_KEY_PKCS8_FILE): private_key = serialization.load_pem_private_key(
with open(PRIVATE_KEY_FILE, "rb") as pem_in: PRIVATE_KEY_FILE.read_bytes(), password=None, backend=default_backend()
private_key = serialization.load_pem_private_key( )
pem_in.read(), password=None, backend=default_backend()
)
# Serialize the private key to PKCS8 format
pem = private_key.private_bytes( pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(), encryption_algorithm=serialization.NoEncryption(),
) )
# Write the private key to a file PRIVATE_KEY_PKCS8_FILE.write_bytes(pem)
with open(PRIVATE_KEY_PKCS8_FILE, "wb") as pem_out:
pem_out.write(pem)
def generate_public_key(): def generate_public_key():
if not os.path.isfile(PUBLIC_KEY_FILE): if not PUBLIC_KEY_FILE.exists():
with open(PRIVATE_KEY_FILE, "rb") as pem_in: private_key = serialization.load_pem_private_key(
private_key = serialization.load_pem_private_key( PRIVATE_KEY_FILE.read_bytes(), password=None, backend=default_backend()
pem_in.read(), password=None, backend=default_backend() )
)
# Get the public key from the private key
public_key = private_key.public_key() public_key = private_key.public_key()
# Serialize the public key to PEM format
pem = public_key.public_bytes( pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo, format=serialization.PublicFormat.SubjectPublicKeyInfo,
) )
# Write the public key to a file PUBLIC_KEY_FILE.write_bytes(pem)
with open(PUBLIC_KEY_FILE, "wb") as pem_out:
pem_out.write(pem)
def ensure_certificates(): def ensure_certificates():
@ -92,41 +82,37 @@ def hkdf(input_key, salt, info, length):
).derive(input_key) ).derive(input_key)
def _browser_base64(data):
return base64.urlsafe_b64encode(data).decode("utf-8").rstrip("=")
class Notifications: class Notifications:
private_key_pem = None private_key_pem = None
public_key = None public_key = None
public_key_jwk = None public_key_base64 = None
def __init__(self): def __init__(self):
ensure_certificates() ensure_certificates()
with open(PRIVATE_KEY_FILE, "rb") as pem_in:
private_key = serialization.load_pem_private_key(
pem_in.read(), password=None, backend=default_backend()
)
self.private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(PUBLIC_KEY_FILE, "rb") as pem_in: private_key = serialization.load_pem_private_key(
self.public_key = serialization.load_pem_public_key( PRIVATE_KEY_FILE.read_bytes(), password=None, backend=default_backend()
pem_in.read(), backend=default_backend() )
self.private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
self.public_key = serialization.load_pem_public_key(
PUBLIC_KEY_FILE.read_bytes(), backend=default_backend()
)
self.public_key_base64 = _browser_base64(
self.public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
) )
public_numbers = self.public_key.public_numbers() )
self.public_key_jwk = {
"kty": "EC",
"crv": "P-256",
"x": base64.urlsafe_b64encode(
public_numbers.x.to_bytes(32, byteorder="big")
).decode("utf-8"),
"y": base64.urlsafe_b64encode(
public_numbers.y.to_bytes(32, byteorder="big")
).decode("utf-8"),
}
print(f"Public key JWK: {self.public_key_jwk}")
def create_notification_authorization(self, push_url): def create_notification_authorization(self, push_url):
target = urlparse(push_url) target = urlparse(push_url)
@ -152,7 +138,7 @@ class Notifications:
algorithm="ES256", algorithm="ES256",
) )
def create_encrypted_payload(self, auth: str, p256dh:str, payload:str): def create_encrypted_payload(self, auth: str, p256dh: str, payload: str):
# 1. Generate private key # 1. Generate private key
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
@ -161,13 +147,13 @@ class Notifications:
salt = os.urandom(16) salt = os.urandom(16)
subscription_pub_key_bytes = base64.urlsafe_b64decode(p256dh+ '==') subscription_pub_key_bytes = base64.urlsafe_b64decode(p256dh + "====")
subscription_public_key = ec.EllipticCurvePublicKey.from_encoded_point( subscription_public_key = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), subscription_pub_key_bytes ec.SECP256R1(), subscription_pub_key_bytes
) )
shared_secret = private_key.exchange(ec.ECDH(), subscription_public_key) shared_secret = private_key.exchange(ec.ECDH(), subscription_public_key)
auth_dec = base64.b64decode(auth+ '==') auth_dec = base64.urlsafe_b64decode(auth + "==")
auth_enc = b"Content-Encoding: auth\x00" auth_enc = b"Content-Encoding: auth\x00"
prk = hkdf(shared_secret, auth_dec, auth_enc, 32) prk = hkdf(shared_secret, auth_dec, auth_enc, 32)
@ -175,7 +161,8 @@ class Notifications:
key_label = b"P-256\x00" key_label = b"P-256\x00"
subscription_len = len(subscription_pub_key_bytes).to_bytes(2, "big") subscription_len = len(subscription_pub_key_bytes).to_bytes(2, "big")
local_pub_bytes = public_key.public_bytes( local_pub_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
) )
local_len = len(local_pub_bytes).to_bytes(2, "big") local_len = len(local_pub_bytes).to_bytes(2, "big")
context = ( context = (
@ -208,18 +195,6 @@ class Notifications:
'public_key': local_pub_bytes 'public_key': local_pub_bytes
} }
@property
def public_key_base64(self):
return (
base64.urlsafe_b64encode(
self.public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
)
.decode("utf-8")
.rstrip("=")
)
@cache @cache
def get_notifications(): def get_notifications():

View File

@ -41,7 +41,7 @@
<a class="no-select" style="display:none" id="install-button" href="#">📥</a> <a class="no-select" style="display:none" id="install-button" href="#">📥</a>
<a class="no-select" href="/threads.html">👥</a> <a class="no-select" href="/threads.html">👥</a>
<a class="no-select" href="/settings/index.html">⚙️</a> <a class="no-select" href="/settings/index.html">⚙️</a>
<a class="no-select" href="#" onclick="registerServiceWorker">✉️</a> <a class="no-select" href="#" onclick="registerNotificationsServiceWorker()">✉️</a>
<a class="no-select" href="/logout.html">🔒</a> <a class="no-select" href="/logout.html">🔒</a>
</nav> </nav>

View File

@ -82,8 +82,8 @@ class PushView(BaseFormView):
test_payload = { test_payload = {
"title": "Hey retoor", "title": "Hey retoor",
"message": "Guess what? ;P", "message": "Guess what? ;P",
"icon": "https://molodetz.online/image/snek192.png", "icon": "/image/snek192.png",
"url": "https://localhost:8081", "url": "/web.html",
} }
encryped = notifications.create_encrypted_payload( encryped = notifications.create_encrypted_payload(