This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/tests/e2e/test_source_oauth.py

380 lines
14 KiB
Python
Raw Normal View History

"""test OAuth Source"""
from os.path import abspath
2020-09-11 21:21:11 +00:00
from sys import platform
from time import sleep
from typing import Any, Optional
2020-09-11 21:21:11 +00:00
from unittest.case import skipUnless
from django.test import override_settings
from docker.models.containers import Container
from docker.types import Healthcheck
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
2021-02-27 23:27:18 +00:00
from selenium.webdriver.support.wait import WebDriverWait
from structlog.stdlib import get_logger
from yaml import safe_dump
2020-12-05 21:08:42 +00:00
from authentik.flows.models import Flow
from authentik.providers.oauth2.generators import (
2020-09-26 16:17:53 +00:00
generate_client_id,
generate_client_secret,
)
2020-12-05 21:08:42 +00:00
from authentik.sources.oauth.models import OAuthSource
2021-02-27 23:27:18 +00:00
from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry
CONFIG_PATH = "/tmp/dex.yml" # nosec
LOGGER = get_logger()
2020-09-11 21:21:11 +00:00
@skipUnless(platform.startswith("linux"), "requires local docker")
2020-09-26 15:44:05 +00:00
class TestSourceOAuth2(SeleniumTestCase):
"""test OAuth Source flow"""
container: Container
def setUp(self):
2020-07-09 12:59:25 +00:00
self.client_secret = generate_client_secret()
2020-09-11 21:21:11 +00:00
self.prepare_dex_config()
super().setUp()
def prepare_dex_config(self):
"""Since Dex does not document which environment
variables can be used to configure clients"""
2020-07-09 21:13:14 +00:00
config = {
"enablePasswordDB": True,
"issuer": "http://127.0.0.1:5556/dex",
"logger": {"level": "debug"},
"staticClients": [
{
"id": "example-app",
"name": "Example App",
"redirectURIs": [
self.url(
2020-12-05 21:08:42 +00:00
"authentik_sources_oauth:oauth-client-callback",
2020-07-09 21:13:14 +00:00
source_slug="dex",
)
],
"secret": self.client_secret,
}
],
"staticPasswords": [
{
"email": "admin@example.com",
# hash for password
"hash": "$2a$10$2b2cU8CPhOTaGrs1HRQuAueS7JTT5ZHsHSzYiFPm1leZck7Mc8T4W",
"userID": "08a8684b-db88-4b73-90a9-3cd1661f5466",
"username": "admin",
}
],
"storage": {"config": {"file": "/tmp/dex.db"}, "type": "sqlite3"}, # nosec
2020-07-09 21:13:14 +00:00
"web": {"http": "0.0.0.0:5556"},
}
with open(CONFIG_PATH, "w+") as _file:
safe_dump(config, _file)
def get_container_specs(self) -> Optional[dict[str, Any]]:
2020-09-11 21:21:11 +00:00
return {
2020-10-27 09:30:08 +00:00
"image": "quay.io/dexidp/dex:v2.24.0",
2020-09-11 21:21:11 +00:00
"detach": True,
"network_mode": "host",
"auto_remove": True,
"command": "serve /config.yml",
"healthcheck": Healthcheck(
test=["CMD", "wget", "--spider", "http://localhost:5556/dex/healthz"],
interval=5 * 100 * 1000000,
start_period=1 * 100 * 1000000,
),
2020-09-11 21:21:11 +00:00
"volumes": {abspath(CONFIG_PATH): {"bind": "/config.yml", "mode": "ro"}},
}
2020-07-09 21:13:14 +00:00
def create_objects(self):
"""Create required objects"""
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
OAuthSource.objects.create( # nosec
name="dex",
slug="dex",
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="openid-connect",
authorization_url="http://127.0.0.1:5556/dex/auth",
access_token_url="http://127.0.0.1:5556/dex/token",
profile_url="http://127.0.0.1:5556/dex/userinfo",
consumer_key="example-app",
2020-07-09 12:59:25 +00:00
consumer_secret=self.client_secret,
)
@retry()
2021-02-27 23:27:18 +00:00
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@apply_migration("authentik_flows", "0009_source_flows")
@apply_migration("authentik_crypto", "0002_create_self_signed_kp")
@object_manager
2020-07-09 21:13:14 +00:00
def test_oauth_enroll(self):
"""test OAuth Source With With OIDC"""
self.create_objects()
self.driver.get(self.live_server_url)
2021-02-27 23:27:18 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
ec.presence_of_element_located(
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
)
)
2021-02-27 23:27:18 +00:00
identification_stage.find_element(
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.ID, "login")))
self.driver.find_element(By.ID, "login").send_keys("admin@example.com")
self.driver.find_element(By.ID, "password").send_keys("password")
self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER)
# Wait until we're logged in
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))
)
self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click()
# At this point we've been redirected back
# and we're asked for the username
2021-02-27 23:27:18 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
prompt_stage = self.get_shadow_root(
"ak-stage-prompt", flow_executor
)
prompt_stage.find_element(
By.CSS_SELECTOR, "input[name=username]"
).click()
prompt_stage.find_element(
By.CSS_SELECTOR, "input[name=username]"
).send_keys("foo")
prompt_stage.find_element(
By.CSS_SELECTOR, "input[name=username]"
).send_keys(Keys.ENTER)
2020-11-23 13:24:42 +00:00
# Wait until we've logged in
2021-02-27 17:48:41 +00:00
self.wait_for_url(self.shell_url("/library"))
self.driver.get(self.url("authentik_core:user-details"))
self.assertEqual(
self.driver.find_element(By.ID, "id_username").get_attribute("value"), "foo"
)
self.assertEqual(
2020-09-30 17:34:22 +00:00
self.driver.find_element(By.ID, "id_name").get_attribute("value"),
"admin",
)
self.assertEqual(
self.driver.find_element(By.ID, "id_email").get_attribute("value"),
"admin@example.com",
)
@retry()
2021-02-27 23:27:18 +00:00
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@apply_migration("authentik_flows", "0009_source_flows")
@apply_migration("authentik_crypto", "0002_create_self_signed_kp")
@object_manager
@override_settings(SESSION_COOKIE_SAMESITE="strict")
def test_oauth_samesite_strict(self):
"""test OAuth Source With SameSite set to strict
(=will fail because session is not carried over)"""
self.create_objects()
self.driver.get(self.live_server_url)
2021-02-27 23:27:18 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
ec.presence_of_element_located(
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
)
)
2021-02-27 23:27:18 +00:00
identification_stage.find_element(
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.ID, "login")))
self.driver.find_element(By.ID, "login").send_keys("admin@example.com")
self.driver.find_element(By.ID, "password").send_keys("password")
self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER)
# Wait until we're logged in
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))
)
self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click()
@retry()
2021-02-27 23:27:18 +00:00
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@apply_migration("authentik_flows", "0009_source_flows")
@apply_migration("authentik_crypto", "0002_create_self_signed_kp")
@object_manager
def test_oauth_enroll_auth(self):
"""test OAuth Source With With OIDC (enroll and authenticate again)"""
self.test_oauth_enroll()
# We're logged in at the end of this, log out and re-login
2020-12-05 21:08:42 +00:00
self.driver.get(self.url("authentik_flows:default-invalidation"))
2021-02-27 23:27:18 +00:00
sleep(1)
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
wait = WebDriverWait(identification_stage, self.wait_timeout)
2021-02-27 23:27:18 +00:00
wait.until(
ec.presence_of_element_located(
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
)
)
2021-02-27 23:27:18 +00:00
identification_stage.find_element(
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
).click()
2021-02-27 23:27:18 +00:00
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.ID, "login")))
self.driver.find_element(By.ID, "login").send_keys("admin@example.com")
self.driver.find_element(By.ID, "password").send_keys("password")
self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER)
# Wait until we're logged in
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "button[type=submit]"))
)
self.driver.find_element(By.CSS_SELECTOR, "button[type=submit]").click()
2020-11-23 13:24:42 +00:00
# Wait until we've logged in
2021-02-27 17:48:41 +00:00
self.wait_for_url(self.shell_url("/library"))
self.driver.get(self.url("authentik_core:user-details"))
self.assertEqual(
self.driver.find_element(By.ID, "id_username").get_attribute("value"), "foo"
)
self.assertEqual(
2020-09-30 17:34:22 +00:00
self.driver.find_element(By.ID, "id_name").get_attribute("value"),
"admin",
)
self.assertEqual(
self.driver.find_element(By.ID, "id_email").get_attribute("value"),
"admin@example.com",
)
2020-09-26 15:44:05 +00:00
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestSourceOAuth1(SeleniumTestCase):
"""Test OAuth1 Source"""
2020-09-26 15:44:05 +00:00
def setUp(self) -> None:
self.client_id = generate_client_id()
self.client_secret = generate_client_secret()
self.source_slug = "oauth1-test"
super().setUp()
def get_container_specs(self) -> Optional[dict[str, Any]]:
2020-09-26 15:44:05 +00:00
return {
"image": "beryju/oauth1-test-server",
2020-09-26 15:44:05 +00:00
"detach": True,
"network_mode": "host",
"auto_remove": True,
"environment": {
"OAUTH1_CLIENT_ID": self.client_id,
"OAUTH1_CLIENT_SECRET": self.client_secret,
"OAUTH1_REDIRECT_URI": (
self.url(
2020-12-05 21:08:42 +00:00
"authentik_sources_oauth:oauth-client-callback",
2020-09-26 15:44:05 +00:00
source_slug=self.source_slug,
)
),
},
}
def create_objects(self):
"""Create required objects"""
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
OAuthSource.objects.create( # nosec
2020-09-26 15:44:05 +00:00
name="oauth1",
slug=self.source_slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="twitter",
request_token_url="http://localhost:5000/oauth/request_token",
access_token_url="http://localhost:5000/oauth/access_token",
authorization_url="http://localhost:5000/oauth/authorize",
profile_url="http://localhost:5000/api/me",
consumer_key=self.client_id,
consumer_secret=self.client_secret,
)
@retry()
2021-02-27 23:27:18 +00:00
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@apply_migration("authentik_flows", "0009_source_flows")
@apply_migration("authentik_crypto", "0002_create_self_signed_kp")
@object_manager
2020-09-26 15:44:05 +00:00
def test_oauth_enroll(self):
"""test OAuth Source With With OIDC"""
self.create_objects()
self.driver.get(self.live_server_url)
2021-02-27 23:27:18 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
2020-09-26 15:44:05 +00:00
ec.presence_of_element_located(
(By.CLASS_NAME, "pf-c-login__main-footer-links-item-link")
)
)
2021-02-27 23:27:18 +00:00
identification_stage.find_element(
2020-09-26 15:44:05 +00:00
By.CLASS_NAME, "pf-c-login__main-footer-links-item-link"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.NAME, "username")))
self.driver.find_element(By.NAME, "username").send_keys("example-user")
self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER)
# Wait until we're logged in
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']"))
)
self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click()
# Wait until we've loaded the user info page
sleep(2)
2020-11-23 13:24:42 +00:00
# Wait until we've logged in
2021-02-27 17:48:41 +00:00
self.wait_for_url(self.shell_url("/library"))
self.driver.get(self.url("authentik_core:user-details"))
2020-09-26 15:44:05 +00:00
self.assertEqual(
2020-09-26 16:17:53 +00:00
self.driver.find_element(By.ID, "id_username").get_attribute("value"),
"example-user",
2020-09-26 15:44:05 +00:00
)
self.assertEqual(
2020-09-26 16:17:53 +00:00
self.driver.find_element(By.ID, "id_name").get_attribute("value"),
"test name",
2020-09-26 15:44:05 +00:00
)
self.assertEqual(
self.driver.find_element(By.ID, "id_email").get_attribute("value"),
"foo@example.com",
)