2025-11-09 23:29:07 +01:00
from datetime import timedelta
2025-11-10 01:58:41 +01:00
from typing import Optional , List
2025-11-09 23:29:07 +01:00
from fastapi import APIRouter , Depends , HTTPException , status
2025-11-10 01:58:41 +01:00
from pydantic import BaseModel
2025-11-09 23:29:07 +01:00
2025-11-13 23:22:05 +01:00
from . . auth import (
authenticate_user ,
create_access_token ,
get_password_hash ,
get_current_user ,
get_current_verified_user ,
verify_password ,
)
2025-11-09 23:29:07 +01:00
from . . models import User
2025-11-13 23:22:05 +01:00
from . . schemas import Token , UserCreate
2025-11-10 01:58:41 +01:00
from . . two_factor import (
2025-11-13 23:22:05 +01:00
generate_totp_secret ,
generate_totp_uri ,
generate_qr_code_base64 ,
verify_totp_code ,
generate_recovery_codes ,
hash_recovery_codes ,
2025-11-10 01:58:41 +01:00
)
2025-11-09 23:29:07 +01:00
router = APIRouter (
prefix = " /auth " ,
tags = [ " auth " ] ,
)
2025-11-13 23:22:05 +01:00
2025-11-11 01:05:13 +01:00
class LoginRequest ( BaseModel ) :
username : str
password : str
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
class TwoFactorLogin ( BaseModel ) :
username : str
password : str
two_factor_code : Optional [ str ] = None
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
class TwoFactorSetupResponse ( BaseModel ) :
secret : str
qr_code_base64 : str
recovery_codes : List [ str ]
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
class TwoFactorCode ( BaseModel ) :
two_factor_code : str
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
class TwoFactorDisable ( BaseModel ) :
password : str
two_factor_code : str
2025-11-13 23:22:05 +01:00
2025-11-09 23:29:07 +01:00
@router.post ( " /register " , response_model = Token )
async def register_user ( user_in : UserCreate ) :
user = await User . get_or_none ( username = user_in . username )
if user :
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST ,
detail = " Username already registered " ,
)
user = await User . get_or_none ( email = user_in . email )
if user :
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST ,
detail = " Email already registered " ,
)
hashed_password = get_password_hash ( user_in . password )
user = await User . create (
username = user_in . username ,
email = user_in . email ,
hashed_password = hashed_password ,
)
2025-11-11 12:47:26 +01:00
# Send welcome email
from . . mail import queue_email
2025-11-13 23:22:05 +01:00
2025-11-11 12:47:26 +01:00
queue_email (
to_email = user . email ,
2025-11-13 12:05:05 +01:00
subject = " Welcome to MyWebdav! " ,
body = f " Hi { user . username } , \n \n Welcome to MyWebdav! Your account has been created successfully. \n \n Best regards, \n The MyWebdav Team " ,
2025-11-13 23:22:05 +01:00
html = f " <h1>Welcome to MyWebdav!</h1><p>Hi { user . username } ,</p><p>Welcome to MyWebdav! Your account has been created successfully.</p><p>Best regards,<br>The MyWebdav Team</p> " ,
2025-11-11 12:47:26 +01:00
)
2025-11-13 23:22:05 +01:00
access_token_expires = timedelta ( minutes = 30 ) # Use settings
2025-11-09 23:29:07 +01:00
access_token = create_access_token (
data = { " sub " : user . username } , expires_delta = access_token_expires
)
return { " access_token " : access_token , " token_type " : " bearer " }
2025-11-13 23:22:05 +01:00
2025-11-09 23:29:07 +01:00
@router.post ( " /token " , response_model = Token )
2025-11-11 01:05:13 +01:00
async def login_for_access_token ( login_data : LoginRequest ) :
2025-11-13 23:22:05 +01:00
auth_result = await authenticate_user (
login_data . username , login_data . password , None
)
2025-11-10 15:46:40 +01:00
2025-11-10 01:58:41 +01:00
if not auth_result :
2025-11-09 23:29:07 +01:00
raise HTTPException (
status_code = status . HTTP_401_UNAUTHORIZED ,
2025-11-10 15:46:40 +01:00
detail = " Incorrect username or password " ,
2025-11-09 23:29:07 +01:00
headers = { " WWW-Authenticate " : " Bearer " } ,
)
2025-11-10 15:46:40 +01:00
2025-11-10 01:58:41 +01:00
user = auth_result [ " user " ]
if auth_result [ " 2fa_required " ] :
raise HTTPException (
status_code = status . HTTP_403_FORBIDDEN ,
detail = " Two-factor authentication required " ,
headers = { " X-2FA-Required " : " true " } ,
)
2025-11-11 01:05:13 +01:00
access_token_expires = timedelta ( minutes = 30 )
2025-11-09 23:29:07 +01:00
access_token = create_access_token (
2025-11-13 23:22:05 +01:00
data = { " sub " : user . username } ,
expires_delta = access_token_expires ,
two_factor_verified = True ,
2025-11-10 01:58:41 +01:00
)
return { " access_token " : access_token , " token_type " : " bearer " }
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
@router.post ( " /2fa/setup " , response_model = TwoFactorSetupResponse )
2025-11-13 23:22:05 +01:00
async def setup_two_factor_authentication (
current_user : User = Depends ( get_current_user ) ,
) :
2025-11-10 01:58:41 +01:00
if current_user . is_2fa_enabled :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST , detail = " 2FA is already enabled. "
)
2025-11-10 01:58:41 +01:00
if current_user . two_factor_secret :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST ,
detail = " 2FA setup already initiated. Verify or disable first. " ,
)
2025-11-10 01:58:41 +01:00
secret = generate_totp_secret ( )
current_user . two_factor_secret = secret
await current_user . save ( )
2025-11-13 20:50:03 +01:00
totp_uri = generate_totp_uri ( secret , current_user . email , " MyWebdav " )
2025-11-10 01:58:41 +01:00
qr_code_base64 = generate_qr_code_base64 ( totp_uri )
recovery_codes = generate_recovery_codes ( )
hashed_recovery_codes = hash_recovery_codes ( recovery_codes )
current_user . recovery_codes = " , " . join ( hashed_recovery_codes )
await current_user . save ( )
2025-11-13 23:22:05 +01:00
return TwoFactorSetupResponse (
secret = secret , qr_code_base64 = qr_code_base64 , recovery_codes = recovery_codes
)
2025-11-10 01:58:41 +01:00
@router.post ( " /2fa/verify " , response_model = Token )
2025-11-13 23:22:05 +01:00
async def verify_two_factor_authentication (
two_factor_code_data : TwoFactorCode , current_user : User = Depends ( get_current_user )
) :
2025-11-10 01:58:41 +01:00
if current_user . is_2fa_enabled :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST , detail = " 2FA is already enabled. "
)
2025-11-10 01:58:41 +01:00
if not current_user . two_factor_secret :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST , detail = " 2FA setup not initiated. "
)
2025-11-10 01:58:41 +01:00
2025-11-13 23:22:05 +01:00
if not verify_totp_code (
current_user . two_factor_secret , two_factor_code_data . two_factor_code
) :
raise HTTPException (
status_code = status . HTTP_401_UNAUTHORIZED , detail = " Invalid 2FA code. "
)
2025-11-10 01:58:41 +01:00
current_user . is_2fa_enabled = True
await current_user . save ( )
2025-11-13 23:22:05 +01:00
access_token_expires = timedelta ( minutes = 30 ) # Use settings
2025-11-10 01:58:41 +01:00
access_token = create_access_token (
2025-11-13 23:22:05 +01:00
data = { " sub " : current_user . username } ,
expires_delta = access_token_expires ,
two_factor_verified = True ,
2025-11-09 23:29:07 +01:00
)
return { " access_token " : access_token , " token_type " : " bearer " }
2025-11-10 01:58:41 +01:00
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
@router.post ( " /2fa/disable " , response_model = dict )
2025-11-13 23:22:05 +01:00
async def disable_two_factor_authentication (
disable_data : TwoFactorDisable ,
current_user : User = Depends ( get_current_verified_user ) ,
) :
2025-11-10 01:58:41 +01:00
if not current_user . is_2fa_enabled :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST , detail = " 2FA is not enabled. "
)
2025-11-10 01:58:41 +01:00
# Verify password
if not verify_password ( disable_data . password , current_user . hashed_password ) :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_401_UNAUTHORIZED , detail = " Invalid password. "
)
2025-11-10 01:58:41 +01:00
# Verify 2FA code
2025-11-13 23:22:05 +01:00
if not verify_totp_code (
current_user . two_factor_secret , disable_data . two_factor_code
) :
raise HTTPException (
status_code = status . HTTP_401_UNAUTHORIZED , detail = " Invalid 2FA code. "
)
2025-11-10 01:58:41 +01:00
current_user . two_factor_secret = None
current_user . is_2fa_enabled = False
current_user . recovery_codes = None
await current_user . save ( )
return { " message " : " 2FA disabled successfully. " }
2025-11-13 23:22:05 +01:00
2025-11-10 01:58:41 +01:00
@router.get ( " /2fa/recovery-codes " , response_model = List [ str ] )
2025-11-13 23:22:05 +01:00
async def get_new_recovery_codes (
current_user : User = Depends ( get_current_verified_user ) ,
) :
2025-11-10 01:58:41 +01:00
if not current_user . is_2fa_enabled :
2025-11-13 23:22:05 +01:00
raise HTTPException (
status_code = status . HTTP_400_BAD_REQUEST , detail = " 2FA is not enabled. "
)
2025-11-10 01:58:41 +01:00
recovery_codes = generate_recovery_codes ( )
hashed_recovery_codes = hash_recovery_codes ( recovery_codes )
current_user . recovery_codes = " , " . join ( hashed_recovery_codes )
await current_user . save ( )
return recovery_codes