# retoor from datetime import datetime, timedelta from typing import Optional import secrets import hashlib from fastapi import Request, HTTPException from fastapi.responses import RedirectResponse from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from mywebdav.settings import settings class AdminSessionManager: def __init__(self): self.serializer = URLSafeTimedSerializer(settings.ADMIN_SESSION_SECRET) self.cookie_name = "admin_session" self.max_age = settings.ADMIN_SESSION_EXPIRE_HOURS * 3600 def create_session(self, username: str) -> str: data = { "username": username, "created": datetime.utcnow().isoformat(), "nonce": secrets.token_hex(8) } return self.serializer.dumps(data) def verify_session(self, token: str) -> Optional[dict]: try: data = self.serializer.loads(token, max_age=self.max_age) return data except (BadSignature, SignatureExpired): return None def get_session_from_request(self, request: Request) -> Optional[dict]: token = request.cookies.get(self.cookie_name) if not token: return None return self.verify_session(token) session_manager = AdminSessionManager() def verify_admin_credentials(username: str, password: str) -> bool: expected_username = settings.ADMIN_USERNAME expected_password = settings.ADMIN_PASSWORD username_hash = hashlib.sha256(username.encode()).digest() expected_hash = hashlib.sha256(expected_username.encode()).digest() username_match = secrets.compare_digest(username_hash, expected_hash) password_hash = hashlib.sha256(password.encode()).digest() expected_pw_hash = hashlib.sha256(expected_password.encode()).digest() password_match = secrets.compare_digest(password_hash, expected_pw_hash) return username_match and password_match def get_admin_session(request: Request) -> Optional[dict]: return session_manager.get_session_from_request(request) def require_admin(request: Request) -> dict: session = get_admin_session(request) if not session: raise HTTPException(status_code=303, headers={"Location": "/manage/login"}) return session def create_session_response(response: RedirectResponse, username: str) -> RedirectResponse: token = session_manager.create_session(username) response.set_cookie( key=session_manager.cookie_name, value=token, max_age=session_manager.max_age, httponly=True, samesite="lax", secure=False ) return response def clear_session_response(response: RedirectResponse) -> RedirectResponse: response.delete_cookie(key=session_manager.cookie_name) return response def generate_csrf_token(session: dict) -> str: data = f"{session.get('nonce', '')}{settings.ADMIN_SESSION_SECRET}" return hashlib.sha256(data.encode()).hexdigest()[:32] def verify_csrf_token(session: dict, token: str) -> bool: expected = generate_csrf_token(session) return secrets.compare_digest(expected, token)