Implement login with secure password rehashing
This commit is contained in:
		
							parent
							
								
									f37b7392e2
								
							
						
					
					
						commit
						3bb09dbaea
					
				@ -1,13 +1,13 @@
 | 
			
		||||
from fastapi import FastAPI
 | 
			
		||||
from fastapi.middleware.cors import CORSMiddleware
 | 
			
		||||
from app.routes import router
 | 
			
		||||
from constants import origins
 | 
			
		||||
from constants import ORIGINS
 | 
			
		||||
 | 
			
		||||
app = FastAPI()
 | 
			
		||||
app.include_router(router)
 | 
			
		||||
app.add_middleware(
 | 
			
		||||
    CORSMiddleware,
 | 
			
		||||
    allow_origins=origins,
 | 
			
		||||
    allow_origins=ORIGINS,
 | 
			
		||||
    allow_credentials=True,
 | 
			
		||||
    allow_methods=["*"],
 | 
			
		||||
    allow_headers=["*"],
 | 
			
		||||
 | 
			
		||||
@ -15,12 +15,10 @@ def add_user(data: UserCreate, db: Session = Depends(get_db)):
 | 
			
		||||
    return user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO Use OAuth2 for verification
 | 
			
		||||
@router.post("/login", response_model=UserLoginResponse)
 | 
			
		||||
def log_in(
 | 
			
		||||
    data: UserLogin, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme),
 | 
			
		||||
):
 | 
			
		||||
    pass
 | 
			
		||||
def login(data: UserLogin, db: Session = Depends(get_db)):
 | 
			
		||||
    response = authenticate_user(data=data, db=db)
 | 
			
		||||
    return response
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@router.post("/otpVerification", response_model=OTPVerifyResponse)
 | 
			
		||||
 | 
			
		||||
@ -1,13 +1,15 @@
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from fastapi import HTTPException
 | 
			
		||||
from hashlib import sha1
 | 
			
		||||
from passlib.context import CryptContext
 | 
			
		||||
 | 
			
		||||
from app.schemas import *
 | 
			
		||||
from constants import SHA1_SALT
 | 
			
		||||
from database import SessionLocal
 | 
			
		||||
from database.models import *
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
 | 
			
		||||
pwd_context = CryptContext(schemes=["bcrypt"])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_db():
 | 
			
		||||
@ -60,7 +62,55 @@ def update_otp(data: OTPResend, db):
 | 
			
		||||
    db.commit()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    )
 | 
			
		||||
def rehash_password(password):
 | 
			
		||||
    return pwd_context.hash(secret=password)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_password_hash(user, password, db):
 | 
			
		||||
    new_hash = rehash_password(password=password)
 | 
			
		||||
    db.query(Users).filter(Users.email == user.email).update({Users.password: new_hash})
 | 
			
		||||
    db.commit()
 | 
			
		||||
    db.refresh(user)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_sha1_hash(db_hash):
 | 
			
		||||
    hash_length = len(db_hash)
 | 
			
		||||
    sha1_length = 40
 | 
			
		||||
    if hash_length == sha1_length:
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def verify_legacy_password(user, password, db):
 | 
			
		||||
    hash = SHA1_SALT + password
 | 
			
		||||
    correct_password = user.password == sha1(hash.encode("utf-8")).hexdigest()
 | 
			
		||||
    if correct_password:
 | 
			
		||||
        update_password_hash(user=user, password=password, db=db)
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def verify_updated_password(user, password):
 | 
			
		||||
    return pwd_context.verify(secret=password, hash=user.password)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def verify_password(user, password, db):
 | 
			
		||||
    legacy_hash = check_sha1_hash(user.password)
 | 
			
		||||
    if legacy_hash:
 | 
			
		||||
        return verify_legacy_password(user=user, password=password, db=db)
 | 
			
		||||
    return verify_updated_password(user=user, password=password)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def authenticate_user(data: UserLogin, db):
 | 
			
		||||
    user = fetch_user_by_email(data=data, db=db)
 | 
			
		||||
    if not user:
 | 
			
		||||
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 | 
			
		||||
    correct_password = verify_password(user=user, password=data.password, db=db)
 | 
			
		||||
    if not correct_password:
 | 
			
		||||
        raise HTTPException(status_code=400, detail="Incorrect username or password")
 | 
			
		||||
    valid_account = user.status
 | 
			
		||||
    if not valid_account:
 | 
			
		||||
        raise HTTPException(status_code=400, detail="Your account is not active")
 | 
			
		||||
    return user
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,4 +1,3 @@
 | 
			
		||||
from pytest import mark
 | 
			
		||||
from secrets import token_hex
 | 
			
		||||
 | 
			
		||||
from app.schemas import *
 | 
			
		||||
@ -24,19 +23,6 @@ def test_registration():
 | 
			
		||||
    assert response.status_code == 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@mark.skip(reason="not implemented")
 | 
			
		||||
def test_login():
 | 
			
		||||
    user = {
 | 
			
		||||
        "email": "12@gmail.com",
 | 
			
		||||
        "password": "odyfo2020",
 | 
			
		||||
        "device_id": "fEll6hxazGQ:APA91bFpsB44ZHgjUItYOKTTmUxxkJsWiuaeojdxiTLVbz-AwN90XwLvpA6nRQoLrUYaF-HoHTz4Vc5S0VlqemerJ6MjG4zqwfNYB75whQVQI1M29yhMc3oFdl1me2zP_RY2dXbfx7UW",
 | 
			
		||||
        "lang_type": 2,
 | 
			
		||||
        "user_type": 1,
 | 
			
		||||
    }
 | 
			
		||||
    response = client.post("/login", json=user)
 | 
			
		||||
    assert response.status_code == 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_otp_verification(get_test_db):
 | 
			
		||||
    user = get_test_db.query(Users).filter(Users.email == "oyvey@hotmail.com").first()
 | 
			
		||||
    data = {
 | 
			
		||||
@ -47,6 +33,18 @@ def test_otp_verification(get_test_db):
 | 
			
		||||
    assert response.status_code == 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_login():
 | 
			
		||||
    user = {
 | 
			
		||||
        "email": "testorganizer@odyfo.com",
 | 
			
		||||
        "password": "odyfo2020",
 | 
			
		||||
        "device_id": "0",
 | 
			
		||||
        "lang_type": 1,
 | 
			
		||||
        "user_type": 2,
 | 
			
		||||
    }
 | 
			
		||||
    response = client.post("/login", json=user)
 | 
			
		||||
    assert response.status_code == 200
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_resend_otp():
 | 
			
		||||
    data = {
 | 
			
		||||
        "email": "testorganizer@odyfo.com",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user