From 4280847bcc6de0fd512c4cead4bb49f382421abc Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Thu, 1 Jul 2021 11:51:00 +0200 Subject: [PATCH] tests/e2e: add LDAP bind and search tests Signed-off-by: Jens Langhammer --- tests/e2e/test_provider_ldap.py | 227 ++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 tests/e2e/test_provider_ldap.py diff --git a/tests/e2e/test_provider_ldap.py b/tests/e2e/test_provider_ldap.py new file mode 100644 index 000000000..f54765489 --- /dev/null +++ b/tests/e2e/test_provider_ldap.py @@ -0,0 +1,227 @@ +"""LDAP and Outpost e2e tests""" +from sys import platform +from time import sleep +from unittest.case import skipUnless + +from docker.client import DockerClient, from_env +from docker.models.containers import Container +from guardian.shortcuts import get_anonymous_user +from ldap3 import ( + ALL, + ALL_ATTRIBUTES, + ALL_OPERATIONAL_ATTRIBUTES, + SUBTREE, + Connection, + Server, +) +from ldap3.core.exceptions import LDAPInsufficientAccessRightsResult + +from authentik.core.models import Application, Group, User +from authentik.events.models import Event, EventAction +from authentik.flows.models import Flow +from authentik.outposts.models import Outpost, OutpostType +from authentik.providers.ldap.models import LDAPProvider +from tests.e2e.utils import ( + USER, + SeleniumTestCase, + apply_migration, + object_manager, + retry, +) + + +@skipUnless(platform.startswith("linux"), "requires local docker") +class TestProviderLDAP(SeleniumTestCase): + """LDAP and Outpost e2e tests""" + + ldap_container: Container + + def tearDown(self) -> None: + super().tearDown() + self.output_container_logs(self.ldap_container) + self.ldap_container.kill() + + def start_ldap(self, outpost: Outpost) -> Container: + """Start ldap container based on outpost created""" + client: DockerClient = from_env() + container = client.containers.run( + image="beryju.org/authentik/outpost-ldap:gh-master", + detach=True, + network_mode="host", + auto_remove=True, + environment={ + "AUTHENTIK_HOST": self.live_server_url, + "AUTHENTIK_TOKEN": outpost.token.key, + }, + ) + return container + + def _prepare(self) -> User: + """prepare user, provider, app and container""" + # set additionalHeaders to test later + user = USER() + user.attributes["extraAttribute"] = "bar" + user.save() + + ldap: LDAPProvider = LDAPProvider.objects.create( + name="ldap_provider", + authorization_flow=Flow.objects.get(slug="default-authentication-flow"), + search_group=Group.objects.first(), + ) + # we need to create an application to actually access the ldap + Application.objects.create(name="ldap", slug="ldap", provider=ldap) + outpost: Outpost = Outpost.objects.create( + name="ldap_outpost", + type=OutpostType.LDAP, + ) + outpost.providers.add(ldap) + outpost.save() + user = outpost.user + + self.ldap_container = self.start_ldap(outpost) + + # Wait until outpost healthcheck succeeds + healthcheck_retries = 0 + while healthcheck_retries < 50: + if len(outpost.state) > 0: + state = outpost.state[0] + if state.last_seen: + break + healthcheck_retries += 1 + sleep(0.5) + return user + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @object_manager + def test_ldap_bind_success(self): + """Test simple bind""" + self._prepare() + server = Server("ldap://localhost:3389", get_info=ALL) + _connection = Connection( + server, + raise_exceptions=True, + user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", + password=USER().username, + ) + _connection.bind() + self.assertTrue( + Event.objects.filter( + action=EventAction.LOGIN, + user={ + "pk": USER().pk, + "email": USER().email, + "username": USER().username, + }, + ) + ) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @object_manager + def test_ldap_bind_fail(self): + """Test simple bind (failed)""" + self._prepare() + server = Server("ldap://localhost:3389", get_info=ALL) + _connection = Connection( + server, + raise_exceptions=True, + user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io", + password=USER().username + "fqwerwqer", + ) + with self.assertRaises(LDAPInsufficientAccessRightsResult): + _connection.bind() + anon = get_anonymous_user() + self.assertTrue( + Event.objects.filter( + action=EventAction.LOGIN_FAILED, + user={"pk": anon.pk, "email": anon.email, "username": anon.username}, + ) + ) + + @retry() + @apply_migration("authentik_core", "0003_default_user") + @apply_migration("authentik_flows", "0008_default_flows") + @object_manager + def test_ldap_bind_search(self): + """Test simple bind + search""" + outpost_user = self._prepare() + server = Server("ldap://localhost:3389", get_info=ALL) + _connection = Connection( + server, + raise_exceptions=True, + user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", + password=USER().username, + ) + _connection.bind() + self.assertTrue( + Event.objects.filter( + action=EventAction.LOGIN, + user={ + "pk": USER().pk, + "email": USER().email, + "username": USER().username, + }, + ) + ) + _connection.search( + "ou=users,dc=ldap,dc=goauthentik,dc=io", + "(objectClass=user)", + search_scope=SUBTREE, + attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES], + ) + response = _connection.response + # Remove raw_attributes to make checking easier + for obj in response: + del obj["raw_attributes"] + del obj["raw_dn"] + self.assertCountEqual( + response, + [ + { + "dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", + "attributes": { + "cn": [outpost_user.username], + "uid": [outpost_user.uid], + "name": [""], + "displayName": [""], + "mail": [""], + "objectClass": [ + "user", + "organizationalPerson", + "goauthentik.io/ldap/user", + ], + "memberOf": [], + "goauthentik.io/ldap/active": ["true"], + "goauthentik.io/ldap/superuser": ["false"], + "goauthentik.io/user/override-ips": ["true"], + "goauthentik.io/user/service-account": ["true"], + }, + "type": "searchResEntry", + }, + { + "dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io", + "attributes": { + "cn": [USER().username], + "uid": [USER().uid], + "name": [USER().name], + "displayName": [USER().name], + "mail": [USER().email], + "objectClass": [ + "user", + "organizationalPerson", + "goauthentik.io/ldap/user", + ], + "memberOf": [ + "cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io" + ], + "goauthentik.io/ldap/active": ["true"], + "goauthentik.io/ldap/superuser": ["true"], + "extraAttribute": ["bar"], + }, + "type": "searchResEntry", + }, + ], + )