"""LDAP and Outpost e2e tests""" from sys import platform from time import sleep from unittest.case import skipUnless from docker.client import DockerClient, from_env from docker.models.containers import Container from guardian.shortcuts import get_anonymous_user from ldap3 import ( ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server, ) from ldap3.core.exceptions import LDAPInsufficientAccessRightsResult from authentik.core.models import Application, Group, User from authentik.events.models import Event, EventAction from authentik.flows.models import Flow from authentik.outposts.models import Outpost, OutpostType from authentik.providers.ldap.models import LDAPProvider from tests.e2e.utils import ( USER, SeleniumTestCase, apply_migration, object_manager, retry, ) @skipUnless(platform.startswith("linux"), "requires local docker") class TestProviderLDAP(SeleniumTestCase): """LDAP and Outpost e2e tests""" ldap_container: Container def tearDown(self) -> None: super().tearDown() self.output_container_logs(self.ldap_container) self.ldap_container.kill() def start_ldap(self, outpost: Outpost) -> Container: """Start ldap container based on outpost created""" client: DockerClient = from_env() container = client.containers.run( image="beryju.org/authentik/outpost-ldap:gh-master", detach=True, network_mode="host", auto_remove=True, environment={ "AUTHENTIK_HOST": self.live_server_url, "AUTHENTIK_TOKEN": outpost.token.key, }, ) return container def _prepare(self) -> User: """prepare user, provider, app and container""" # set additionalHeaders to test later user = USER() user.attributes["extraAttribute"] = "bar" user.save() ldap: LDAPProvider = LDAPProvider.objects.create( name="ldap_provider", authorization_flow=Flow.objects.get(slug="default-authentication-flow"), search_group=Group.objects.first(), ) # we need to create an application to actually access the ldap Application.objects.create(name="ldap", slug="ldap", provider=ldap) outpost: Outpost = Outpost.objects.create( name="ldap_outpost", type=OutpostType.LDAP, ) outpost.providers.add(ldap) outpost.save() user = outpost.user self.ldap_container = self.start_ldap(outpost) # Wait until outpost healthcheck succeeds healthcheck_retries = 0 while healthcheck_retries < 50: if len(outpost.state) > 0: state = outpost.state[0] if state.last_seen: break healthcheck_retries += 1 sleep(0.5) return user @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @object_manager def test_ldap_bind_success(self): """Test simple bind""" self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", password=USER().username, ) _connection.bind() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN, user={ "pk": USER().pk, "email": USER().email, "username": USER().username, }, ) ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @object_manager def test_ldap_bind_fail(self): """Test simple bind (failed)""" self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", password=USER().username + "fqwerwqer", ) with self.assertRaises(LDAPInsufficientAccessRightsResult): _connection.bind() anon = get_anonymous_user() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN_FAILED, user={"pk": anon.pk, "email": anon.email, "username": anon.username}, ) ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_core", "0009_group_is_superuser") @apply_migration("authentik_flows", "0008_default_flows") @object_manager def test_ldap_bind_search(self): """Test simple bind + search""" outpost_user = self._prepare() server = Server("ldap://localhost:3389", get_info=ALL) _connection = Connection( server, raise_exceptions=True, user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", password=USER().username, ) _connection.bind() self.assertTrue( Event.objects.filter( action=EventAction.LOGIN, user={ "pk": USER().pk, "email": USER().email, "username": USER().username, }, ) ) _connection.search( "ou=users,dc=ldap,dc=goauthentik,dc=io", "(objectClass=user)", search_scope=SUBTREE, attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES], ) response = _connection.response # Remove raw_attributes to make checking easier for obj in response: del obj["raw_attributes"] del obj["raw_dn"] self.assertCountEqual( response, [ { "dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", "attributes": { "cn": [outpost_user.username], "uid": [outpost_user.uid], "name": [""], "displayName": [""], "mail": [""], "objectClass": [ "user", "organizationalPerson", "goauthentik.io/ldap/user", ], "uidNumber": [str(2000 + outpost_user.pk)], "gidNumber": [str(2000 + outpost_user.pk)], "memberOf": [], "accountStatus": ["true"], "superuser": ["false"], "goauthentik.io/ldap/active": ["true"], "goauthentik.io/ldap/superuser": ["false"], "goauthentik.io/user/override-ips": ["true"], "goauthentik.io/user/service-account": ["true"], }, "type": "searchResEntry", }, { "dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", "attributes": { "cn": [USER().username], "uid": [USER().uid], "name": [USER().name], "displayName": [USER().name], "mail": [USER().email], "objectClass": [ "user", "organizationalPerson", "goauthentik.io/ldap/user", ], "uidNumber": [str(2000 + USER().pk)], "gidNumber": [str(2000 + USER().pk)], "memberOf": [ "cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io" ], "accountStatus": ["true"], "superuser": ["true"], "goauthentik.io/ldap/active": ["true"], "goauthentik.io/ldap/superuser": ["true"], "extraAttribute": ["bar"], }, "type": "searchResEntry", }, ], )