# retoor <retoor@molodetz.nl>
from datetime import datetime, timedelta
from typing import Optional
import secrets
import hashlib
from fastapi import Request, HTTPException
from fastapi.responses import RedirectResponse
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from mywebdav.settings import settings
class AdminSessionManager:
def __init__(self):
self.serializer = URLSafeTimedSerializer(settings.ADMIN_SESSION_SECRET)
self.cookie_name = "admin_session"
self.max_age = settings.ADMIN_SESSION_EXPIRE_HOURS * 3600
def create_session(self, username: str) -> str:
data = {
"username": username,
"created": datetime.utcnow().isoformat(),
"nonce": secrets.token_hex(8)
}
return self.serializer.dumps(data)
def verify_session(self, token: str) -> Optional[dict]:
try:
data = self.serializer.loads(token, max_age=self.max_age)
return data
except (BadSignature, SignatureExpired):
return None
def get_session_from_request(self, request: Request) -> Optional[dict]:
token = request.cookies.get(self.cookie_name)
if not token:
return None
return self.verify_session(token)
session_manager = AdminSessionManager()
def verify_admin_credentials(username: str, password: str) -> bool:
expected_username = settings.ADMIN_USERNAME
expected_password = settings.ADMIN_PASSWORD
username_hash = hashlib.sha256(username.encode()).digest()
expected_hash = hashlib.sha256(expected_username.encode()).digest()
username_match = secrets.compare_digest(username_hash, expected_hash)
password_hash = hashlib.sha256(password.encode()).digest()
expected_pw_hash = hashlib.sha256(expected_password.encode()).digest()
password_match = secrets.compare_digest(password_hash, expected_pw_hash)
return username_match and password_match
def get_admin_session(request: Request) -> Optional[dict]:
return session_manager.get_session_from_request(request)
def require_admin(request: Request) -> dict:
session = get_admin_session(request)
if not session:
raise HTTPException(status_code=303, headers={"Location": "/manage/login"})
return session
def create_session_response(response: RedirectResponse, username: str) -> RedirectResponse:
token = session_manager.create_session(username)
response.set_cookie(
key=session_manager.cookie_name,
value=token,
max_age=session_manager.max_age,
httponly=True,
samesite="lax",
secure=False
)
return response
def clear_session_response(response: RedirectResponse) -> RedirectResponse:
response.delete_cookie(key=session_manager.cookie_name)
return response
def generate_csrf_token(session: dict) -> str:
data = f"{session.get('nonce', '')}{settings.ADMIN_SESSION_SECRET}"
return hashlib.sha256(data.encode()).hexdigest()[:32]
def verify_csrf_token(session: dict, token: str) -> bool:
expected = generate_csrf_token(session)
return secrets.compare_digest(expected, token)