"""test SAML Provider flow""" from json import loads from sys import platform from time import sleep from unittest.case import skipUnless from docker import DockerClient, from_env from docker.models.containers import Container from docker.types import Healthcheck from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions as ec from structlog.stdlib import get_logger from authentik.core.models import Application from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow from authentik.policies.expression.models import ExpressionPolicy from authentik.policies.models import PolicyBinding from authentik.providers.saml.models import SAMLBindings, SAMLPropertyMapping, SAMLProvider from tests.e2e.utils import USER, SeleniumTestCase, apply_migration, object_manager, retry LOGGER = get_logger() @skipUnless(platform.startswith("linux"), "requires local docker") class TestProviderSAML(SeleniumTestCase): """test SAML Provider flow""" container: Container def setup_client(self, provider: SAMLProvider) -> Container: """Setup client saml-sp container which we test SAML against""" client: DockerClient = from_env() container = client.containers.run( image="ghcr.io/beryju/saml-test-sp:latest", detach=True, network_mode="host", auto_remove=True, healthcheck=Healthcheck( test=["CMD", "wget", "--spider", "http://localhost:9009/health"], interval=5 * 100 * 1000000, start_period=1 * 100 * 1000000, ), environment={ "SP_ENTITY_ID": provider.issuer, "SP_SSO_BINDING": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", "SP_METADATA_URL": ( self.url( "authentik_api:samlprovider-metadata", pk=provider.pk, ) + "?download" ), }, ) while True: container.reload() status = container.attrs.get("State", {}).get("Health", {}).get("Status") if status == "healthy": return container LOGGER.info("Container failed healthcheck") sleep(1) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0010_provider_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_sp_initiated_implicit(self): """test SAML Provider flow SP-initiated flow (implicit consent)""" # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-implicit-consent" ) provider: SAMLProvider = SAMLProvider.objects.create( name="saml-test", acs_url="http://localhost:9009/saml/acs", audience="authentik-e2e", issuer="authentik-e2e", sp_binding=SAMLBindings.POST, authorization_flow=authorization_flow, signing_kp=CertificateKeyPair.objects.first(), ) provider.property_mappings.set(SAMLPropertyMapping.objects.all()) provider.save() Application.objects.create( name="SAML", slug="authentik-saml", provider=provider, ) self.container = self.setup_client(provider) self.driver.get("http://localhost:9009") self.login() self.wait_for_url("http://localhost:9009/") body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"], [USER().name], ) self.assertEqual( body["attr"][ "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname" ], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/username"], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/uid"], [str(USER().pk)], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"], [USER().email], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn"], [USER().email], ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0010_provider_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_sp_initiated_explicit(self): """test SAML Provider flow SP-initiated flow (explicit consent)""" # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-explicit-consent" ) provider: SAMLProvider = SAMLProvider.objects.create( name="saml-test", acs_url="http://localhost:9009/saml/acs", audience="authentik-e2e", issuer="authentik-e2e", sp_binding=SAMLBindings.POST, authorization_flow=authorization_flow, signing_kp=CertificateKeyPair.objects.first(), ) provider.property_mappings.set(SAMLPropertyMapping.objects.all()) provider.save() app = Application.objects.create( name="SAML", slug="authentik-saml", provider=provider, ) self.container = self.setup_client(provider) self.driver.get("http://localhost:9009") self.login() self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "ak-flow-executor"))) flow_executor = self.get_shadow_root("ak-flow-executor") consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) self.assertIn( app.name, consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text, ) consent_stage.find_element( By.CSS_SELECTOR, ("[type=submit]"), ).click() self.wait_for_url("http://localhost:9009/") body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"], [USER().name], ) self.assertEqual( body["attr"][ "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname" ], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/username"], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/uid"], [str(USER().pk)], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"], [USER().email], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn"], [USER().email], ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0010_provider_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_idp_initiated_implicit(self): """test SAML Provider flow IdP-initiated flow (implicit consent)""" # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-implicit-consent" ) provider: SAMLProvider = SAMLProvider.objects.create( name="saml-test", acs_url="http://localhost:9009/saml/acs", audience="authentik-e2e", issuer="authentik-e2e", sp_binding=SAMLBindings.POST, authorization_flow=authorization_flow, signing_kp=CertificateKeyPair.objects.first(), ) provider.property_mappings.set(SAMLPropertyMapping.objects.all()) provider.save() Application.objects.create( name="SAML", slug="authentik-saml", provider=provider, ) self.container = self.setup_client(provider) self.driver.get( self.url( "authentik_providers_saml:sso-init", application_slug=provider.application.slug, ) ) self.login() sleep(1) self.wait_for_url("http://localhost:9009/") body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name"], [USER().name], ) self.assertEqual( body["attr"][ "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname" ], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/username"], [USER().username], ) self.assertEqual( body["attr"]["http://schemas.goauthentik.io/2021/02/saml/uid"], [str(USER().pk)], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress"], [USER().email], ) self.assertEqual( body["attr"]["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn"], [USER().email], ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0011_flow_title") @apply_migration("authentik_flows", "0010_provider_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @object_manager def test_sp_initiated_denied(self): """test SAML Provider flow SP-initiated flow (Policy denies access)""" # Bootstrap all needed objects authorization_flow = Flow.objects.get( slug="default-provider-authorization-implicit-consent" ) negative_policy = ExpressionPolicy.objects.create( name="negative-static", expression="return False" ) provider: SAMLProvider = SAMLProvider.objects.create( name="saml-test", acs_url="http://localhost:9009/saml/acs", audience="authentik-e2e", issuer="authentik-e2e", sp_binding=SAMLBindings.POST, authorization_flow=authorization_flow, signing_kp=CertificateKeyPair.objects.first(), ) provider.property_mappings.set(SAMLPropertyMapping.objects.all()) provider.save() app = Application.objects.create( name="SAML", slug="authentik-saml", provider=provider, ) PolicyBinding.objects.create(target=app, policy=negative_policy, order=0) self.container = self.setup_client(provider) self.driver.get("http://localhost:9009/") self.login() self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "header > h1"))) self.assertEqual( self.driver.find_element(By.CSS_SELECTOR, "header > h1").text, "Permission denied", )