"""test OAuth Source""" from os.path import abspath from sys import platform from time import sleep from typing import Any, Dict, Optional from unittest.case import skipUnless from django.test import override_settings from docker.models.containers import Container from docker.types import Healthcheck from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as ec from structlog.stdlib import get_logger from yaml import safe_dump from authentik.flows.models import Flow from authentik.providers.oauth2.generators import ( generate_client_id, generate_client_secret, ) from authentik.sources.oauth.models import OAuthSource from tests.e2e.utils import SeleniumTestCase, retry CONFIG_PATH = "/tmp/dex.yml" # nosec LOGGER = get_logger() @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceOAuth2(SeleniumTestCase): """test OAuth Source flow""" container: Container def setUp(self): self.client_secret = generate_client_secret() self.prepare_dex_config() super().setUp() def prepare_dex_config(self): """Since Dex does not document which environment variables can be used to configure clients""" config = { "enablePasswordDB": True, "issuer": "http://127.0.0.1:5556/dex", "logger": {"level": "debug"}, "staticClients": [ { "id": "example-app", "name": "Example App", "redirectURIs": [ self.url( "authentik_sources_oauth:oauth-client-callback", source_slug="dex", ) ], "secret": self.client_secret, } ], "staticPasswords": [ { "email": "admin@example.com", # hash for password "hash": "$2a$10$2b2cU8CPhOTaGrs1HRQuAueS7JTT5ZHsHSzYiFPm1leZck7Mc8T4W", "userID": "08a8684b-db88-4b73-90a9-3cd1661f5466", "username": "admin", } ], "storage": {"config": {"file": "/tmp/dex.db"}, "type": "sqlite3"}, # nosec "web": {"http": "0.0.0.0:5556"}, } with open(CONFIG_PATH, "w+") as _file: safe_dump(config, _file) def get_container_specs(self) -> Optional[Dict[str, Any]]: return { "image": "quay.io/dexidp/dex:v2.24.0", "detach": True, "network_mode": "host", "auto_remove": True, "command": "serve /config.yml", "healthcheck": Healthcheck( test=["CMD", "wget", "--spider", "http://localhost:5556/dex/healthz"], interval=5 * 100 * 1000000, start_period=1 * 100 * 1000000, ), "volumes": {abspath(CONFIG_PATH): {"bind": "/config.yml", "mode": "ro"}}, } def create_objects(self): """Create required objects""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") OAuthSource.objects.create( # nosec name="dex", slug="dex", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, provider_type="openid-connect", authorization_url="http://127.0.0.1:5556/dex/auth", access_token_url="http://127.0.0.1:5556/dex/token", profile_url="http://127.0.0.1:5556/dex/userinfo", consumer_key="example-app", consumer_secret=self.client_secret, ) @retry() def test_oauth_enroll(self): """test OAuth Source With With OIDC""" self.create_objects() self.driver.get(self.live_server_url) self.wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) self.driver.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]")) ) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() self.wait.until(ec.presence_of_element_located((By.NAME, "username"))) # At this point we've been redirected back # and we're asked for the username self.driver.find_element(By.NAME, "username").click() self.driver.find_element(By.NAME, "username").send_keys("foo") self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER) # Wait until we've logged in self.wait_for_url(self.shell_url("authentik_core:overview")) self.driver.get(self.url("authentik_core:user-details")) self.assertEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "foo" ) self.assertEqual( self.driver.find_element(By.ID, "id_name").get_attribute("value"), "admin", ) self.assertEqual( self.driver.find_element(By.ID, "id_email").get_attribute("value"), "admin@example.com", ) @retry() @override_settings(SESSION_COOKIE_SAMESITE="strict") def test_oauth_samesite_strict(self): """test OAuth Source With SameSite set to strict (=will fail because session is not carried over)""" self.create_objects() self.driver.get(self.live_server_url) self.wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) self.driver.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]")) ) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, ".pf-c-alert__title")) ) self.assertEqual( self.driver.find_element(By.CSS_SELECTOR, ".pf-c-alert__title").text, "Authentication Failed.", ) @retry() def test_oauth_enroll_auth(self): """test OAuth Source With With OIDC (enroll and authenticate again)""" self.test_oauth_enroll() # We're logged in at the end of this, log out and re-login self.driver.get(self.url("authentik_flows:default-invalidation")) self.wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) sleep(1) self.driver.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() sleep(1) # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.ID, "login"))) self.driver.find_element(By.ID, "login").send_keys("admin@example.com") self.driver.find_element(By.ID, "password").send_keys("password") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]")) ) self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click() # Wait until we've logged in self.wait_for_url(self.shell_url("authentik_core:overview")) self.driver.get(self.url("authentik_core:user-details")) self.assertEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "foo" ) self.assertEqual( self.driver.find_element(By.ID, "id_name").get_attribute("value"), "admin", ) self.assertEqual( self.driver.find_element(By.ID, "id_email").get_attribute("value"), "admin@example.com", ) @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceOAuth1(SeleniumTestCase): """Test OAuth1 Source""" def setUp(self) -> None: self.client_id = generate_client_id() self.client_secret = generate_client_secret() self.source_slug = "oauth1-test" super().setUp() def get_container_specs(self) -> Optional[Dict[str, Any]]: return { "image": "beryju/oauth1-test-server", "detach": True, "network_mode": "host", "auto_remove": True, "environment": { "OAUTH1_CLIENT_ID": self.client_id, "OAUTH1_CLIENT_SECRET": self.client_secret, "OAUTH1_REDIRECT_URI": ( self.url( "authentik_sources_oauth:oauth-client-callback", source_slug=self.source_slug, ) ), }, } def create_objects(self): """Create required objects""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") OAuthSource.objects.create( # nosec name="oauth1", slug=self.source_slug, authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, provider_type="twitter", request_token_url="http://localhost:5000/oauth/request_token", access_token_url="http://localhost:5000/oauth/access_token", authorization_url="http://localhost:5000/oauth/authorize", profile_url="http://localhost:5000/api/me", consumer_key=self.client_id, consumer_secret=self.client_secret, ) @retry() def test_oauth_enroll(self): """test OAuth Source With With OIDC""" self.create_objects() self.driver.get(self.live_server_url) self.wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) self.driver.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() # Now we should be at the IDP, wait for the login field self.wait.until(ec.presence_of_element_located((By.NAME, "username"))) self.driver.find_element(By.NAME, "username").send_keys("example-user") self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER) # Wait until we're logged in self.wait.until( ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']")) ) self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click() # Wait until we've loaded the user info page sleep(2) # Wait until we've logged in self.wait_for_url(self.shell_url("authentik_core:overview")) self.driver.get(self.url("authentik_core:user-details")) self.assertEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "example-user", ) self.assertEqual( self.driver.find_element(By.ID, "id_name").get_attribute("value"), "test name", ) self.assertEqual( self.driver.find_element(By.ID, "id_email").get_attribute("value"), "foo@example.com", )