This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/tests/e2e/test_source_oauth_oauth1.py

142 lines
5.1 KiB
Python
Raw Normal View History

2024-01-01 20:08:40 +00:00
"""test OAuth Source"""
from time import sleep
from typing import Any, Optional
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import User
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id, generate_key
from authentik.sources.oauth.models import OAuthSource
from authentik.sources.oauth.types.registry import SourceType, registry
from authentik.sources.oauth.views.callback import OAuthCallback
from authentik.stages.identification.models import IdentificationStage
from tests.e2e.utils import SeleniumTestCase, retry
class OAuth1Callback(OAuthCallback):
"""OAuth1 Callback with custom getters"""
def get_user_id(self, info: dict[str, str]) -> str:
return info.get("id")
def get_user_enroll_context(
self,
info: dict[str, Any],
) -> dict[str, Any]:
return {
"username": info.get("screen_name"),
"email": info.get("email"),
"name": info.get("name"),
}
@registry.register()
class OAUth1Type(SourceType):
"""OAuth1 Type definition"""
callback_view = OAuth1Callback
verbose_name = "OAuth1"
name = "oauth1"
request_token_url = "http://localhost:5001/oauth/request_token" # nosec
access_token_url = "http://localhost:5001/oauth/access_token" # nosec
authorization_url = "http://localhost:5001/oauth/authorize"
profile_url = "http://localhost:5001/api/me"
urls_customizable = False
class TestSourceOAuth1(SeleniumTestCase):
"""Test OAuth1 Source"""
def setUp(self) -> None:
self.client_id = generate_id()
self.client_secret = generate_key()
self.source_slug = generate_id()
super().setUp()
def get_container_specs(self) -> Optional[dict[str, Any]]:
return {
"image": "ghcr.io/beryju/oauth1-test-server:v1.1",
"detach": True,
"ports": {"5000": "5001"},
"auto_remove": True,
"environment": {
"OAUTH1_CLIENT_ID": self.client_id,
"OAUTH1_CLIENT_SECRET": self.client_secret,
"OAUTH1_REDIRECT_URI": self.url(
"authentik_sources_oauth:oauth-client-callback",
source_slug=self.source_slug,
),
},
}
def create_objects(self):
"""Create required objects"""
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
source = OAuthSource.objects.create( # nosec
name=generate_id(),
slug=self.source_slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="oauth1",
consumer_key=self.client_id,
consumer_secret=self.client_secret,
)
ident_stage = IdentificationStage.objects.first()
ident_stage.sources.set([source])
ident_stage.save()
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/flow-default-source-authentication.yaml",
"default/flow-default-source-enrollment.yaml",
"default/flow-default-source-pre-authentication.yaml",
)
def test_oauth_enroll(self):
"""test OAuth Source With With OIDC"""
self.create_objects()
self.driver.get(self.live_server_url)
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
ec.presence_of_element_located(
(By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button")
)
)
identification_stage.find_element(
By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.NAME, "username")))
self.driver.find_element(By.NAME, "username").send_keys("example-user")
self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER)
sleep(2)
# Wait until we're logged in
self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']")))
self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click()
# Wait until we've loaded the user info page
sleep(2)
# Wait until we've logged in
self.wait_for_url(self.if_user_url("/library"))
self.driver.get(self.if_user_url("/settings"))
self.assert_user(User(username="example-user", name="test name", email="foo@example.com"))