mirror of
https://github.com/mealie-recipes/mealie.git
synced 2026-02-26 17:53:12 -05:00
fix: group creation (#1126)
* fix: unify group creation - closes #1100 * tests: disable password hashing during testing * tests: fix email config tests
This commit is contained in:
1
mealie/core/security/__init__.py
Normal file
1
mealie/core/security/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .security import *
|
||||
43
mealie/core/security/hasher.py
Normal file
43
mealie/core/security/hasher.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from functools import lru_cache
|
||||
from typing import Protocol
|
||||
|
||||
from passlib.context import CryptContext
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
|
||||
|
||||
class Hasher(Protocol):
|
||||
def hash(self, password: str) -> str:
|
||||
...
|
||||
|
||||
def verify(self, password: str, hashed: str) -> bool:
|
||||
...
|
||||
|
||||
|
||||
class FakeHasher:
|
||||
def hash(self, password: str) -> str:
|
||||
return password
|
||||
|
||||
def verify(self, password: str, hashed: str) -> bool:
|
||||
return password == hashed
|
||||
|
||||
|
||||
class PasslibHasher:
|
||||
def __init__(self) -> None:
|
||||
self.ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
def hash(self, password: str) -> str:
|
||||
return self.ctx.hash(password)
|
||||
|
||||
def verify(self, password: str, hashed: str) -> bool:
|
||||
return self.ctx.verify(password, hashed)
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_hasher() -> Hasher:
|
||||
settings = get_app_settings()
|
||||
|
||||
if settings.TESTING:
|
||||
return FakeHasher()
|
||||
|
||||
return PasslibHasher()
|
||||
108
mealie/core/security/security.py
Normal file
108
mealie/core/security/security.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import secrets
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from jose import jwt
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.core.security.hasher import get_hasher
|
||||
from mealie.repos.all_repositories import get_repositories
|
||||
from mealie.repos.repository_factory import AllRepositories
|
||||
from mealie.schema.user import PrivateUser
|
||||
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: timedelta = None) -> str:
|
||||
settings = get_app_settings()
|
||||
|
||||
to_encode = data.copy()
|
||||
expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME)
|
||||
|
||||
expire = datetime.now(timezone.utc) + expires_delta
|
||||
|
||||
to_encode["exp"] = expire
|
||||
return jwt.encode(to_encode, settings.SECRET, algorithm=ALGORITHM)
|
||||
|
||||
|
||||
def create_file_token(file_path: Path) -> str:
|
||||
token_data = {"file": str(file_path)}
|
||||
return create_access_token(token_data, expires_delta=timedelta(minutes=30))
|
||||
|
||||
|
||||
def create_recipe_slug_token(file_path: str | Path) -> str:
|
||||
token_data = {"slug": str(file_path)}
|
||||
return create_access_token(token_data, expires_delta=timedelta(minutes=30))
|
||||
|
||||
|
||||
def user_from_ldap(db: AllRepositories, session, username: str, password: str) -> PrivateUser | bool:
|
||||
"""Given a username and password, tries to authenticate by BINDing to an
|
||||
LDAP server
|
||||
|
||||
If the BIND succeeds, it will either create a new user of that username on
|
||||
the server or return an existing one.
|
||||
Returns False on failure.
|
||||
"""
|
||||
import ldap
|
||||
|
||||
settings = get_app_settings()
|
||||
|
||||
conn = ldap.initialize(settings.LDAP_SERVER_URL)
|
||||
user_dn = settings.LDAP_BIND_TEMPLATE.format(username)
|
||||
try:
|
||||
conn.simple_bind_s(user_dn, password)
|
||||
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
|
||||
return False
|
||||
|
||||
user = db.users.get_one(username, "username", any_case=True)
|
||||
if not user:
|
||||
user = db.users.create(
|
||||
{
|
||||
"username": username,
|
||||
"password": "LDAP",
|
||||
# Fill the next two values with something unique and vaguely
|
||||
# relevant
|
||||
"full_name": username,
|
||||
"email": username,
|
||||
"admin": False,
|
||||
},
|
||||
)
|
||||
|
||||
if settings.LDAP_ADMIN_FILTER:
|
||||
user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
|
||||
db.users.update(user.id, user)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def authenticate_user(session, email: str, password: str) -> PrivateUser | bool:
|
||||
settings = get_app_settings()
|
||||
|
||||
db = get_repositories(session)
|
||||
user: PrivateUser = db.users.get(email, "email", any_case=True)
|
||||
|
||||
if not user:
|
||||
user = db.users.get(email, "username", any_case=True)
|
||||
|
||||
if settings.LDAP_AUTH_ENABLED and (not user or user.password == "LDAP"):
|
||||
return user_from_ldap(db, session, email, password)
|
||||
|
||||
if not user or not verify_password(password, user.password):
|
||||
return False
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
"""Compares a plain string to a hashed password"""
|
||||
return get_hasher().verify(plain_password, hashed_password)
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""Takes in a raw password and hashes it. Used prior to saving a new password to the database."""
|
||||
return get_hasher().hash(password)
|
||||
|
||||
|
||||
def url_safe_token() -> str:
|
||||
"""Generates a cryptographic token without embedded data. Used for password reset tokens and invitation tokens"""
|
||||
return secrets.token_urlsafe(24)
|
||||
Reference in New Issue
Block a user