This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/tests/e2e/test_flows_authenticators.py

216 lines
8.0 KiB
Python
Raw Normal View History

"""test flow with otp stages"""
from base64 import b32decode
from sys import platform
from time import sleep
from unittest.case import skipUnless
from urllib.parse import parse_qs, urlparse
from django_otp.oath import TOTP
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from django_otp.plugins.otp_totp.models import TOTPDevice
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
2021-02-25 21:54:39 +00:00
from selenium.webdriver.support.wait import WebDriverWait
2020-12-05 21:08:42 +00:00
from authentik.flows.models import Flow, FlowStageBinding
from authentik.stages.authenticator_static.models import AuthenticatorStaticStage
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage
from tests.e2e.utils import USER, SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestFlowsAuthenticator(SeleniumTestCase):
"""test flow with otp stages"""
@retry()
def test_totp_validate(self):
"""test flow with otp stages"""
sleep(1)
# Setup TOTP Device
user = USER()
device = TOTPDevice.objects.create(user=user, confirmed=True, digits=6)
flow: Flow = Flow.objects.get(slug="default-authentication-flow")
FlowStageBinding.objects.create(
target=flow, order=30, stage=AuthenticatorValidateStage.objects.create()
)
self.driver.get(f"{self.live_server_url}/flows/{flow.slug}/")
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).click()
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(USER().username)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(Keys.ENTER)
flow_executor = self.get_shadow_root("ak-flow-executor")
password_stage = self.get_shadow_root("ak-stage-password", flow_executor)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
USER().username
)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
Keys.ENTER
)
# Get expected token
totp = TOTP(device.bin_key, device.step, device.t0, device.digits, device.drift)
flow_executor = self.get_shadow_root("ak-flow-executor")
2021-02-25 21:54:39 +00:00
validation_stage = self.get_shadow_root(
"ak-stage-authenticator-validate", flow_executor
)
code_stage = self.get_shadow_root(
"ak-stage-authenticator-validate-code", validation_stage
)
2021-02-25 21:54:39 +00:00
code_stage.find_element(By.CSS_SELECTOR, "input[name=code]").send_keys(
totp.token()
)
2021-02-25 21:54:39 +00:00
code_stage.find_element(By.CSS_SELECTOR, "input[name=code]").send_keys(
Keys.ENTER
)
2021-02-25 21:54:39 +00:00
self.wait_for_url(self.shell_url("/library"))
2020-11-23 13:24:42 +00:00
self.assert_user(USER())
@retry()
def test_totp_setup(self):
"""test TOTP Setup stage"""
flow: Flow = Flow.objects.get(slug="default-authentication-flow")
self.driver.get(f"{self.live_server_url}/flows/{flow.slug}/")
2021-02-25 21:54:39 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).click()
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(USER().username)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(Keys.ENTER)
flow_executor = self.get_shadow_root("ak-flow-executor")
password_stage = self.get_shadow_root("ak-stage-password", flow_executor)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
USER().username
)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
Keys.ENTER
)
self.wait_for_url(self.shell_url("/library"))
2020-11-23 13:24:42 +00:00
self.assert_user(USER())
self.driver.get(
self.url(
2020-12-05 21:08:42 +00:00
"authentik_flows:configure",
stage_uuid=AuthenticatorTOTPStage.objects.first().stage_uuid,
2020-11-23 13:24:42 +00:00
)
)
2021-02-25 21:54:39 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
totp_stage = self.get_shadow_root("ak-stage-authenticator-totp", flow_executor)
wait = WebDriverWait(totp_stage, self.wait_timeout)
2021-02-25 21:54:39 +00:00
wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "input[name=otp_uri]"))
)
otp_uri = totp_stage.find_element(
By.CSS_SELECTOR, "input[name=otp_uri]"
).get_attribute("value")
# Parse the OTP URI, extract the secret and get the next token
otp_args = urlparse(otp_uri)
self.assertEqual(otp_args.scheme, "otpauth")
otp_qs = parse_qs(otp_args.query)
secret_key = b32decode(otp_qs["secret"][0])
totp = TOTP(secret_key)
2021-02-25 21:54:39 +00:00
totp_stage.find_element(By.CSS_SELECTOR, "input[name=code]").send_keys(
totp.token()
)
totp_stage.find_element(By.CSS_SELECTOR, "input[name=code]").send_keys(
Keys.ENTER
)
sleep(3)
self.assertTrue(TOTPDevice.objects.filter(user=USER(), confirmed=True).exists())
@retry()
def test_static_setup(self):
"""test Static OTP Setup stage"""
flow: Flow = Flow.objects.get(slug="default-authentication-flow")
self.driver.get(f"{self.live_server_url}/flows/{flow.slug}/")
2021-02-25 21:54:39 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root(
"ak-stage-identification", flow_executor
)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).click()
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(USER().username)
identification_stage.find_element(
By.CSS_SELECTOR, "input[name=uid_field]"
).send_keys(Keys.ENTER)
flow_executor = self.get_shadow_root("ak-flow-executor")
password_stage = self.get_shadow_root("ak-stage-password", flow_executor)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
USER().username
)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
Keys.ENTER
)
self.wait_for_url(self.shell_url("/library"))
2020-11-23 13:24:42 +00:00
self.assert_user(USER())
self.driver.get(
self.url(
2020-12-05 21:08:42 +00:00
"authentik_flows:configure",
stage_uuid=AuthenticatorStaticStage.objects.first().stage_uuid,
2020-11-23 13:24:42 +00:00
)
)
# Remember the current URL as we should end up back here
destination_url = self.driver.current_url
2021-02-25 21:54:39 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
authenticator_stage = self.get_shadow_root(
"ak-stage-authenticator-static", flow_executor
)
token = authenticator_stage.find_element(
2020-12-05 21:08:42 +00:00
By.CSS_SELECTOR, ".ak-otp-tokens li:nth-child(1)"
).text
2021-02-25 21:54:39 +00:00
authenticator_stage.find_element(By.CSS_SELECTOR, "button[type=submit]").click()
self.wait_for_url(destination_url)
sleep(1)
self.assertTrue(
StaticDevice.objects.filter(user=USER(), confirmed=True).exists()
)
device = StaticDevice.objects.filter(user=USER(), confirmed=True).first()
self.assertTrue(StaticToken.objects.filter(token=token, device=device).exists())