Feature/new-login-page (#989)

* login page refresh

* use user_id for token identification
This commit is contained in:
Hayden
2022-02-22 11:36:58 -09:00
committed by GitHub
parent 684f39fe24
commit 177a430d8c
5 changed files with 103 additions and 179 deletions

View File

@@ -36,18 +36,17 @@ async def is_logged_in(token: str = Depends(oauth2_scheme_soft_fail), session=De
"""
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: str = payload.get("sub")
long_token: str = payload.get("long_token")
if long_token is not None:
try:
user = validate_long_live_token(session, token, payload.get("id"))
if user:
if validate_long_live_token(session, token, payload.get("id")):
return True
except Exception:
return False
return username is not None
return user_id is not None
except Exception:
return False
@@ -61,37 +60,38 @@ async def get_current_user(token: str = Depends(oauth2_scheme), session=Depends(
)
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: str = payload.get("sub")
long_token: str = payload.get("long_token")
if long_token is not None:
return validate_long_live_token(session, token, payload.get("id"))
if username is None:
if user_id is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
token_data = TokenData(user_id=user_id)
except JWTError as e:
raise credentials_exception from e
db = get_repositories(session)
repos = get_repositories(session)
user = repos.users.get(token_data.user_id, "id", any_case=False)
user = db.users.get(token_data.username, "email", any_case=True)
if user is None:
raise credentials_exception
return user
async def get_admin_user(current_user=Depends(get_current_user)) -> PrivateUser:
async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser:
if not current_user.admin:
raise HTTPException(status.HTTP_403_FORBIDDEN)
return current_user
def validate_long_live_token(session: Session, client_token: str, id: int) -> PrivateUser:
db = get_repositories(session)
repos = get_repositories(session)
tokens: list[LongLiveTokenInDB] = db.api_tokens.get(id, "user_id", limit=9999)
tokens: list[LongLiveTokenInDB] = repos.api_tokens.get(id, "user_id", limit=9999)
for token in tokens:
token: LongLiveTokenInDB
@@ -110,8 +110,8 @@ def validate_file_token(token: Optional[str] = None) -> Path:
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
file_path = Path(payload.get("file"))
except JWTError:
raise credentials_exception
except JWTError as e:
raise credentials_exception from e
return file_path
@@ -127,8 +127,8 @@ def validate_recipe_token(token: Optional[str] = None) -> str:
try:
payload = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM])
slug = payload.get("slug")
except JWTError:
raise credentials_exception
except JWTError as e:
raise credentials_exception from e
return slug

View File

@@ -48,10 +48,7 @@ class MealieAuthToken(BaseModel):
@public_router.post("/token")
def get_token(
data: CustomOAuth2Form = Depends(),
session: Session = Depends(generate_session),
):
def get_token(data: CustomOAuth2Form = Depends(), session: Session = Depends(generate_session)):
email = data.username
password = data.password
@@ -64,12 +61,12 @@ def get_token(
)
duration = timedelta(days=14) if data.remember_me else None
access_token = security.create_access_token(dict(sub=user.email), duration)
access_token = security.create_access_token(dict(sub=str(user.id)), duration)
return MealieAuthToken.respond(access_token)
@user_router.get("/refresh")
async def refresh_token(current_user: PrivateUser = Depends(get_current_user)):
"""Use a valid token to get another token"""
access_token = security.create_access_token(data=dict(sub=current_user.email))
access_token = security.create_access_token(data=dict(sub=str(current_user.id)))
return MealieAuthToken.respond(access_token)

View File

@@ -66,7 +66,7 @@ class UserController(BaseUserController):
self.repos.users.update(item_id, new_data.dict())
if self.user.id == item_id:
access_token = security.create_access_token(data=dict(sub=new_data.email))
access_token = security.create_access_token(data=dict(sub=str(self.user.id)))
return {"access_token": access_token, "token_type": "bearer"}
@user_router.put("/{item_id}/password")

View File

@@ -1,6 +1,6 @@
from typing import Optional
from pydantic import BaseModel
from pydantic import UUID4, BaseModel
from pydantic.types import constr
@@ -10,4 +10,5 @@ class Token(BaseModel):
class TokenData(BaseModel):
user_id: Optional[UUID4]
username: Optional[constr(to_lower=True, strip_whitespace=True)] = None