mirror of
				https://github.com/mealie-recipes/mealie.git
				synced 2025-11-03 18:53:17 -05:00 
			
		
		
		
	* initial oidc implementation * add dynamic scheme * e2e test setup * add caching * fix * try this * add libldap-2.5 to runtime dependencies (#2849) * New translations en-us.json (Norwegian) (#2851) * New Crowdin updates (#2855) * New translations en-us.json (Italian) * New translations en-us.json (Norwegian) * New translations en-us.json (Portuguese) * fix * remove cache * cache yarn deps * cache docker image * cleanup action * lint * fix tests * remove not needed variables * run code gen * fix tests * add docs * move code into custom scheme * remove unneeded type * fix oidc admin * add more tests * add better spacing on login page * create auth providers * clean up testing stuff * type fixes * add OIDC auth method to postgres enum * add option to bypass login screen and go directly to iDP * remove check so we can fallback to another auth method oauth fails * Add provider name to be shown at the login screen * add new properties to admin about api * fix spec * add a prompt to change auth method when changing password * Create new auth section. Add more info on auth methods * update docs * run ruff * update docs * format * docs gen * formatting * initialize logger in class * mypy type fixes * docs gen * add models to get proper fields in docs and fix serialization * validate id token before using it * only request a mealie token on initial callback * remove unused method * fix unit tests * docs gen * check for valid idToken before getting token * add iss to mealie token * check to see if we already have a mealie token before getting one * fix lock file * update authlib * update lock file * add remember me environment variable * add user group setting to allow only certain groups to log in --------- Co-authored-by: Carter Mintey <cmintey8@gmail.com> Co-authored-by: Carter <35710697+cmintey@users.noreply.github.com>
		
			
				
	
	
		
			251 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			251 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from pathlib import Path
 | 
						|
 | 
						|
import ldap
 | 
						|
from pytest import MonkeyPatch
 | 
						|
 | 
						|
from mealie.core import security
 | 
						|
from mealie.core.config import get_app_settings
 | 
						|
from mealie.core.dependencies import validate_file_token
 | 
						|
from mealie.core.security.providers.credentials_provider import CredentialsProvider, CredentialsRequest
 | 
						|
from mealie.core.security.providers.ldap_provider import LDAPProvider
 | 
						|
from mealie.db.db_setup import session_context
 | 
						|
from mealie.db.models.users.users import AuthMethod
 | 
						|
from mealie.schema.user.auth import CredentialsRequestForm
 | 
						|
from mealie.schema.user.user import PrivateUser
 | 
						|
from tests.utils import random_string
 | 
						|
 | 
						|
 | 
						|
class LdapConnMock:
 | 
						|
    def __init__(self, user, password, admin, query_bind, query_password, mail, name) -> None:
 | 
						|
        self.app_settings = get_app_settings()
 | 
						|
        self.user = user
 | 
						|
        self.password = password
 | 
						|
        self.query_bind = query_bind
 | 
						|
        self.query_password = query_password
 | 
						|
        self.admin = admin
 | 
						|
        self.mail = mail
 | 
						|
        self.name = name
 | 
						|
 | 
						|
    def simple_bind_s(self, dn, bind_pw):
 | 
						|
        if dn == "cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN):
 | 
						|
            valid_password = self.password
 | 
						|
        elif "cn={}, {}".format(self.query_bind, self.app_settings.LDAP_BASE_DN):
 | 
						|
            valid_password = self.query_password
 | 
						|
 | 
						|
        if bind_pw == valid_password:
 | 
						|
            return
 | 
						|
 | 
						|
        raise ldap.INVALID_CREDENTIALS
 | 
						|
 | 
						|
    # Default search mock implementation
 | 
						|
    def search_s(self, dn, scope, filter, attrlist):
 | 
						|
        if filter == self.app_settings.LDAP_ADMIN_FILTER:
 | 
						|
            assert attrlist == []
 | 
						|
            assert filter == self.app_settings.LDAP_ADMIN_FILTER
 | 
						|
            assert dn == "cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN)
 | 
						|
            assert scope == ldap.SCOPE_BASE
 | 
						|
 | 
						|
            if not self.admin:
 | 
						|
                return []
 | 
						|
 | 
						|
            return [(dn, {})]
 | 
						|
 | 
						|
        assert attrlist == [
 | 
						|
            self.app_settings.LDAP_ID_ATTRIBUTE,
 | 
						|
            self.app_settings.LDAP_NAME_ATTRIBUTE,
 | 
						|
            self.app_settings.LDAP_MAIL_ATTRIBUTE,
 | 
						|
        ]
 | 
						|
        user_filter = self.app_settings.LDAP_USER_FILTER.format(
 | 
						|
            id_attribute=self.app_settings.LDAP_ID_ATTRIBUTE,
 | 
						|
            mail_attribute=self.app_settings.LDAP_MAIL_ATTRIBUTE,
 | 
						|
            input=self.user,
 | 
						|
        )
 | 
						|
        search_filter = "(&(|({id_attribute}={input})({mail_attribute}={input})){filter})".format(
 | 
						|
            id_attribute=self.app_settings.LDAP_ID_ATTRIBUTE,
 | 
						|
            mail_attribute=self.app_settings.LDAP_MAIL_ATTRIBUTE,
 | 
						|
            input=self.user,
 | 
						|
            filter=user_filter,
 | 
						|
        )
 | 
						|
        assert filter == search_filter
 | 
						|
        assert dn == self.app_settings.LDAP_BASE_DN
 | 
						|
        assert scope == ldap.SCOPE_SUBTREE
 | 
						|
 | 
						|
        return [
 | 
						|
            (
 | 
						|
                "cn={}, {}".format(self.user, self.app_settings.LDAP_BASE_DN),
 | 
						|
                {
 | 
						|
                    self.app_settings.LDAP_ID_ATTRIBUTE: [self.user.encode()],
 | 
						|
                    self.app_settings.LDAP_NAME_ATTRIBUTE: [self.name.encode()],
 | 
						|
                    self.app_settings.LDAP_MAIL_ATTRIBUTE: [self.mail.encode()],
 | 
						|
                },
 | 
						|
            )
 | 
						|
        ]
 | 
						|
 | 
						|
    def set_option(self, option, invalue):
 | 
						|
        pass
 | 
						|
 | 
						|
    def unbind_s(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def start_tls_s(self):
 | 
						|
        pass
 | 
						|
 | 
						|
 | 
						|
def setup_env(monkeypatch: MonkeyPatch):
 | 
						|
    user = random_string(10)
 | 
						|
    mail = random_string(10)
 | 
						|
    name = random_string(10)
 | 
						|
    password = random_string(10)
 | 
						|
    query_bind = random_string(10)
 | 
						|
    query_password = random_string(10)
 | 
						|
    base_dn = "(dc=example,dc=com)"
 | 
						|
    monkeypatch.setenv("LDAP_AUTH_ENABLED", "true")
 | 
						|
    monkeypatch.setenv("LDAP_SERVER_URL", "")  # Not needed due to mocking
 | 
						|
    monkeypatch.setenv("LDAP_BASE_DN", base_dn)
 | 
						|
    monkeypatch.setenv("LDAP_QUERY_BIND", query_bind)
 | 
						|
    monkeypatch.setenv("LDAP_QUERY_PASSWORD", query_password)
 | 
						|
    monkeypatch.setenv("LDAP_USER_FILTER", "(&(objectClass=user)(|({id_attribute}={input})({mail_attribute}={input})))")
 | 
						|
 | 
						|
    return user, mail, name, password, query_bind, query_password
 | 
						|
 | 
						|
 | 
						|
def test_create_file_token():
 | 
						|
    file_path = Path(__file__).parent
 | 
						|
    file_token = security.create_file_token(file_path)
 | 
						|
 | 
						|
    assert file_path == validate_file_token(file_token)
 | 
						|
 | 
						|
 | 
						|
def get_provider(session, username: str, password: str):
 | 
						|
    request_data = CredentialsRequest(username=username, password=password)
 | 
						|
    return LDAPProvider(session, request_data)
 | 
						|
 | 
						|
 | 
						|
def test_ldap_user_creation(monkeypatch: MonkeyPatch):
 | 
						|
    user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
 | 
						|
 | 
						|
    def ldap_initialize_mock(url):
 | 
						|
        assert url == ""
 | 
						|
        return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
 | 
						|
 | 
						|
    monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        provider = get_provider(session, user, password)
 | 
						|
        result = provider.get_user()
 | 
						|
 | 
						|
    assert result
 | 
						|
    assert result.username == user
 | 
						|
    assert result.email == mail
 | 
						|
    assert result.full_name == name
 | 
						|
    assert result.admin is False
 | 
						|
 | 
						|
 | 
						|
def test_ldap_user_creation_fail(monkeypatch: MonkeyPatch):
 | 
						|
    user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
 | 
						|
 | 
						|
    def ldap_initialize_mock(url):
 | 
						|
        assert url == ""
 | 
						|
        return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
 | 
						|
 | 
						|
    monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        provider = get_provider(session, user, password + "a")
 | 
						|
        result = provider.get_user()
 | 
						|
 | 
						|
    assert result is None
 | 
						|
 | 
						|
 | 
						|
def test_ldap_user_creation_non_admin(monkeypatch: MonkeyPatch):
 | 
						|
    user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
 | 
						|
    monkeypatch.setenv("LDAP_ADMIN_FILTER", "(memberOf=cn=admins,dc=example,dc=com)")
 | 
						|
 | 
						|
    def ldap_initialize_mock(url):
 | 
						|
        assert url == ""
 | 
						|
        return LdapConnMock(user, password, False, query_bind, query_password, mail, name)
 | 
						|
 | 
						|
    monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        provider = get_provider(session, user, password)
 | 
						|
        result = provider.get_user()
 | 
						|
 | 
						|
    assert result
 | 
						|
    assert result.username == user
 | 
						|
    assert result.email == mail
 | 
						|
    assert result.full_name == name
 | 
						|
    assert result.admin is False
 | 
						|
 | 
						|
 | 
						|
def test_ldap_user_creation_admin(monkeypatch: MonkeyPatch):
 | 
						|
    user, mail, name, password, query_bind, query_password = setup_env(monkeypatch)
 | 
						|
    monkeypatch.setenv("LDAP_ADMIN_FILTER", "(memberOf=cn=admins,dc=example,dc=com)")
 | 
						|
 | 
						|
    def ldap_initialize_mock(url):
 | 
						|
        assert url == ""
 | 
						|
        return LdapConnMock(user, password, True, query_bind, query_password, mail, name)
 | 
						|
 | 
						|
    monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        provider = get_provider(session, user, password)
 | 
						|
        result = provider.get_user()
 | 
						|
 | 
						|
    assert result
 | 
						|
    assert result.username == user
 | 
						|
    assert result.email == mail
 | 
						|
    assert result.full_name == name
 | 
						|
    assert result.admin
 | 
						|
 | 
						|
 | 
						|
def test_ldap_disabled(monkeypatch: MonkeyPatch):
 | 
						|
    monkeypatch.setenv("LDAP_AUTH_ENABLED", "False")
 | 
						|
 | 
						|
    class Request:
 | 
						|
        def __init__(self, auth_strategy: str):
 | 
						|
            self.cookies = {"mealie.auth.strategy": auth_strategy}
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        form = CredentialsRequestForm("username", "password", False)
 | 
						|
        provider = security.get_auth_provider(session, Request("local"), form)
 | 
						|
 | 
						|
    assert isinstance(provider, CredentialsProvider)
 | 
						|
 | 
						|
 | 
						|
def test_user_login_ldap_auth_method(monkeypatch: MonkeyPatch, ldap_user: PrivateUser):
 | 
						|
    """
 | 
						|
    Test login from a user who was originally created in Mealie, but has since been converted
 | 
						|
    to LDAP auth method
 | 
						|
    """
 | 
						|
    _, _, name, ldap_password, query_bind, query_password = setup_env(monkeypatch)
 | 
						|
 | 
						|
    def ldap_initialize_mock(url):
 | 
						|
        assert url == ""
 | 
						|
        return LdapConnMock(ldap_user.username, ldap_password, False, query_bind, query_password, ldap_user.email, name)
 | 
						|
 | 
						|
    monkeypatch.setattr(ldap, "initialize", ldap_initialize_mock)
 | 
						|
 | 
						|
    get_app_settings.cache_clear()
 | 
						|
 | 
						|
    with session_context() as session:
 | 
						|
        provider = get_provider(session, ldap_user.username, ldap_password)
 | 
						|
        result = provider.get_user()
 | 
						|
 | 
						|
    assert result
 | 
						|
    assert result.username == ldap_user.username
 | 
						|
    assert result.email == ldap_user.email
 | 
						|
    assert result.full_name == ldap_user.full_name
 | 
						|
    assert result.admin == ldap_user.admin
 | 
						|
    assert result.auth_method == AuthMethod.LDAP
 |