"""test LDAP Source""" from typing import Any, Optional from django.db.models import Q from ldap3.core.exceptions import LDAPSessionTerminatedByServerError from authentik.blueprints.tests import apply_blueprint from authentik.core.models import Group, User from authentik.lib.generators import generate_id, generate_key from authentik.sources.ldap.auth import LDAPBackend from authentik.sources.ldap.models import LDAPPropertyMapping, LDAPSource from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer from authentik.sources.ldap.sync.users import UserLDAPSynchronizer from tests.e2e.utils import SeleniumTestCase, retry class TestSourceLDAPSamba(SeleniumTestCase): """test LDAP Source""" def setUp(self): self.admin_password = generate_key() super().setUp() def get_container_specs(self) -> Optional[dict[str, Any]]: return { "image": "ghcr.io/beryju/test-samba-dc:latest", "detach": True, "cap_add": ["SYS_ADMIN"], "ports": { "389": "389/tcp", }, "auto_remove": True, "environment": { "SMB_DOMAIN": "test.goauthentik.io", "SMB_NETBIOS": "goauthentik", "SMB_ADMIN_PASSWORD": self.admin_password, }, } @retry(exceptions=[LDAPSessionTerminatedByServerError]) @apply_blueprint( "system/sources-ldap.yaml", ) def test_source_sync(self): """Test Sync""" source = LDAPSource.objects.create( name=generate_id(), slug=generate_id(), server_uri="ldap://localhost", bind_cn="administrator@test.goauthentik.io", bind_password=self.admin_password, base_dn="dc=test,dc=goauthentik,dc=io", additional_user_dn="ou=users", additional_group_dn="ou=groups", ) source.property_mappings.set( LDAPPropertyMapping.objects.filter( Q(managed__startswith="goauthentik.io/sources/ldap/default-") | Q(managed__startswith="goauthentik.io/sources/ldap/ms-") ) ) source.property_mappings_group.set( LDAPPropertyMapping.objects.filter(name="goauthentik.io/sources/ldap/default-name") ) UserLDAPSynchronizer(source).sync_full() self.assertTrue(User.objects.filter(username="bob").exists()) self.assertTrue(User.objects.filter(username="james").exists()) self.assertTrue(User.objects.filter(username="john").exists()) self.assertTrue(User.objects.filter(username="harry").exists()) @retry(exceptions=[LDAPSessionTerminatedByServerError]) @apply_blueprint( "system/sources-ldap.yaml", ) def test_source_sync_group(self): """Test Sync""" source = LDAPSource.objects.create( name=generate_id(), slug=generate_id(), server_uri="ldap://localhost", bind_cn="administrator@test.goauthentik.io", bind_password=self.admin_password, base_dn="dc=test,dc=goauthentik,dc=io", additional_user_dn="ou=users", additional_group_dn="ou=groups", ) source.property_mappings.set( LDAPPropertyMapping.objects.filter( Q(managed__startswith="goauthentik.io/sources/ldap/default-") | Q(managed__startswith="goauthentik.io/sources/ldap/ms-") ) ) source.property_mappings_group.set( LDAPPropertyMapping.objects.filter(managed="goauthentik.io/sources/ldap/default-name") ) GroupLDAPSynchronizer(source).sync_full() UserLDAPSynchronizer(source).sync_full() MembershipLDAPSynchronizer(source).sync_full() self.assertIsNotNone(User.objects.get(username="bob")) self.assertIsNotNone(User.objects.get(username="james")) self.assertIsNotNone(User.objects.get(username="john")) self.assertIsNotNone(User.objects.get(username="harry")) self.assertIsNotNone(Group.objects.get(name="dev")) self.assertEqual( list(User.objects.get(username="bob").ak_groups.all()), [Group.objects.get(name="dev")] ) self.assertEqual(list(User.objects.get(username="james").ak_groups.all()), []) self.assertEqual( list(User.objects.get(username="john").ak_groups.all().order_by("name")), [Group.objects.get(name="admins"), Group.objects.get(name="dev")], ) self.assertEqual(list(User.objects.get(username="harry").ak_groups.all()), []) @retry(exceptions=[LDAPSessionTerminatedByServerError]) @apply_blueprint( "system/sources-ldap.yaml", ) def test_sync_password(self): """Test Sync""" source = LDAPSource.objects.create( name=generate_id(), slug=generate_id(), server_uri="ldap://localhost", bind_cn="administrator@test.goauthentik.io", bind_password=self.admin_password, base_dn="dc=test,dc=goauthentik,dc=io", additional_user_dn="ou=users", additional_group_dn="ou=groups", ) source.property_mappings.set( LDAPPropertyMapping.objects.filter( Q(managed__startswith="goauthentik.io/sources/ldap/default-") | Q(managed__startswith="goauthentik.io/sources/ldap/ms-") ) ) source.property_mappings_group.set( LDAPPropertyMapping.objects.filter(name="goauthentik.io/sources/ldap/default-name") ) UserLDAPSynchronizer(source).sync_full() username = "bob" password = generate_id() result = self.container.exec_run( ["samba-tool", "user", "setpassword", username, "--newpassword", password] ) self.assertEqual(result.exit_code, 0) user: User = User.objects.get(username=username) # Ensure user has an unusable password directly after sync self.assertFalse(user.has_usable_password()) # Auth (which will fallback to bind) LDAPBackend().auth_user(source, password, username=username) user.refresh_from_db() # User should now have a usable password in the database self.assertTrue(user.has_usable_password()) self.assertTrue(user.check_password(password)) # Set new password new_password = generate_id() result = self.container.exec_run( ["samba-tool", "user", "setpassword", username, "--newpassword", new_password] ) self.assertEqual(result.exit_code, 0) # Sync again UserLDAPSynchronizer(source).sync_full() user.refresh_from_db() # Since password in samba was checked, it should be invalidated here too self.assertFalse(user.has_usable_password())