mirror of
https://github.com/mealie-recipes/mealie.git
synced 2025-12-14 22:35:17 -05:00
fix(backend): 🐛 Grab PRs from dev branch (#826)
* fix(backend): 🐛 Grab PR #780 * feat(frontend): ✨ Grab PR 797 * docs(docs): spelling * feat(backend): ✨ Add LDAP Support from #803 * add test deps Co-authored-by: hay-kot <hay-kot@pm.me>
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
@@ -6,16 +8,17 @@ from jose import jwt
|
||||
from passlib.context import CryptContext
|
||||
|
||||
from mealie.core.config import get_app_settings
|
||||
from mealie.db.data_access_layer.access_model_factory import Database
|
||||
from mealie.db.database import get_database
|
||||
from mealie.schema.user import PrivateUser
|
||||
|
||||
settings = get_app_settings()
|
||||
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
|
||||
def create_access_token(data: dict(), expires_delta: timedelta = None) -> str:
|
||||
settings = get_app_settings()
|
||||
|
||||
to_encode = data.copy()
|
||||
expires_delta = expires_delta or timedelta(hours=settings.TOKEN_TIME)
|
||||
|
||||
@@ -35,18 +38,61 @@ def create_recipe_slug_token(file_path: str) -> str:
|
||||
return create_access_token(token_data, expires_delta=timedelta(minutes=30))
|
||||
|
||||
|
||||
def authenticate_user(session, email: str, password: str) -> PrivateUser:
|
||||
db = get_database(session)
|
||||
def user_from_ldap(db: Database, session, username: str, password: str) -> PrivateUser:
|
||||
"""Given a username and password, tries to authenticate by BINDing to an
|
||||
LDAP server
|
||||
|
||||
If the BIND succeeds, it will either create a new user of that username on
|
||||
the server or return an existing one.
|
||||
Returns False on failure.
|
||||
"""
|
||||
import ldap
|
||||
|
||||
settings = get_app_settings()
|
||||
|
||||
conn = ldap.initialize(settings.LDAP_SERVER_URL)
|
||||
user_dn = settings.LDAP_BIND_TEMPLATE.format(username)
|
||||
try:
|
||||
conn.simple_bind_s(user_dn, password)
|
||||
except (ldap.INVALID_CREDENTIALS, ldap.NO_SUCH_OBJECT):
|
||||
return False
|
||||
|
||||
user = db.users.get_one(username, "username", any_case=True)
|
||||
if not user:
|
||||
user = db.users.create(
|
||||
{
|
||||
"username": username,
|
||||
"password": "LDAP",
|
||||
# Fill the next two values with something unique and vaguely
|
||||
# relevant
|
||||
"full_name": username,
|
||||
"email": username,
|
||||
"admin": False,
|
||||
},
|
||||
)
|
||||
|
||||
if settings.LDAP_ADMIN_FILTER:
|
||||
user.admin = len(conn.search_s(user_dn, ldap.SCOPE_BASE, settings.LDAP_ADMIN_FILTER, [])) > 0
|
||||
db.users.update(user.id, user)
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def authenticate_user(session, email: str, password: str) -> PrivateUser | False:
|
||||
settings = get_app_settings()
|
||||
|
||||
db = get_database(session)
|
||||
user: PrivateUser = db.users.get(email, "email", any_case=True)
|
||||
|
||||
if not user:
|
||||
user = db.users.get(email, "username", any_case=True)
|
||||
if not user:
|
||||
|
||||
if settings.LDAP_AUTH_ENABLED and (not user or user.password == "LDAP"):
|
||||
return user_from_ldap(db, session, email, password)
|
||||
|
||||
if not user or not verify_password(password, user.password):
|
||||
return False
|
||||
|
||||
if not verify_password(password, user.password):
|
||||
return False
|
||||
return user
|
||||
|
||||
|
||||
|
||||
@@ -83,6 +83,25 @@ class AppSettings(BaseSettings):
|
||||
|
||||
return "" not in required and None not in required
|
||||
|
||||
# ===============================================
|
||||
# LDAP Configuration
|
||||
|
||||
LDAP_AUTH_ENABLED: bool = False
|
||||
LDAP_SERVER_URL: str = None
|
||||
LDAP_BIND_TEMPLATE: str = None
|
||||
LDAP_ADMIN_FILTER: str = None
|
||||
|
||||
@property
|
||||
def LDAP_ENABLED(self) -> bool:
|
||||
"""Validates LDAP settings are all set"""
|
||||
required = {
|
||||
self.LDAP_SERVER_URL,
|
||||
self.LDAP_BIND_TEMPLATE,
|
||||
self.LDAP_ADMIN_FILTER,
|
||||
}
|
||||
|
||||
return "" not in required and None not in required and self.LDAP_AUTH_ENABLED
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
|
||||
@@ -43,6 +43,8 @@ def write_image(recipe_slug: str, file_data: bytes, extension: str) -> Path:
|
||||
|
||||
def scrape_image(image_url: str, slug: str) -> Path:
|
||||
logger.info(f"Image URL: {image_url}")
|
||||
_FIREFOX_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0"
|
||||
|
||||
if isinstance(image_url, str): # Handles String Types
|
||||
pass
|
||||
|
||||
@@ -54,7 +56,7 @@ def scrape_image(image_url: str, slug: str) -> Path:
|
||||
all_image_requests = []
|
||||
for url in image_url:
|
||||
try:
|
||||
r = requests.get(url, stream=True, headers={"User-Agent": ""})
|
||||
r = requests.get(url, stream=True, headers={"User-Agent": _FIREFOX_UA})
|
||||
except Exception:
|
||||
logger.exception("Image {url} could not be requested")
|
||||
continue
|
||||
@@ -72,7 +74,7 @@ def scrape_image(image_url: str, slug: str) -> Path:
|
||||
filename = Recipe(slug=slug).image_dir.joinpath(filename)
|
||||
|
||||
try:
|
||||
r = requests.get(image_url, stream=True)
|
||||
r = requests.get(image_url, stream=True, headers={"User-Agent": _FIREFOX_UA})
|
||||
except Exception:
|
||||
logger.exception("Fatal Image Request Exception")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user