tests/e2e: fix tests to work without docker network_mode host (#8035)

* tests/e2e: start fixing tests to work without docker network_mode host

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* migrate saml and oauth source

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* update deps (mainly to update lxml which was causing a segfault on macos)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* migrate saml source

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* format

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix sentry env in testing

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make oauth types name and slug make more sense

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* migrate ldap

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* make tests run with --keepdb? partially?

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* migrate radius

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix proxy provider first half

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* install libxml2-dev to work around seg fault?

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* actually that doesn't change anything since use latest libxml2

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* format

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* refactor did not refactor the code

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L 2024-01-01 21:08:40 +01:00 committed by GitHub
parent b778c35396
commit b84facb9fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 836 additions and 705 deletions

View File

@ -40,10 +40,9 @@ class PytestTestRunner(DiscoverRunner): # pragma: no cover
f"ghcr.io/goauthentik/dev-%(type)s:{get_docker_tag()}",
)
CONFIG.set("error_reporting.sample_rate", 0)
sentry_init(
environment="testing",
send_default_pii=True,
)
CONFIG.set("error_reporting.environment", "testing")
CONFIG.set("error_reporting.send_pii", True)
sentry_init()
@classmethod
def add_arguments(cls, parser: ArgumentParser):

View File

@ -99,7 +99,9 @@ class OAuthSourceSerializer(SourceSerializer):
]:
if getattr(provider_type, url, None) is None:
if url not in attrs:
raise ValidationError(f"{url} is required for provider {provider_type.name}")
raise ValidationError(
f"{url} is required for provider {provider_type.verbose_name}"
)
return attrs
class Meta:

View File

@ -104,8 +104,8 @@ class AppleType(SourceType):
callback_view = AppleOAuth2Callback
redirect_view = AppleOAuthRedirect
name = "Apple"
slug = "apple"
verbose_name = "Apple"
name = "apple"
authorization_url = "https://appleid.apple.com/auth/authorize"
access_token_url = "https://appleid.apple.com/auth/token" # nosec

View File

@ -43,8 +43,8 @@ class AzureADType(SourceType):
callback_view = AzureADOAuthCallback
redirect_view = AzureADOAuthRedirect
name = "Azure AD"
slug = "azuread"
verbose_name = "Azure AD"
name = "azuread"
urls_customizable = True

View File

@ -36,8 +36,8 @@ class DiscordType(SourceType):
callback_view = DiscordOAuth2Callback
redirect_view = DiscordOAuthRedirect
name = "Discord"
slug = "discord"
verbose_name = "Discord"
name = "discord"
authorization_url = "https://discord.com/api/oauth2/authorize"
access_token_url = "https://discord.com/api/oauth2/token" # nosec

View File

@ -48,8 +48,8 @@ class FacebookType(SourceType):
callback_view = FacebookOAuth2Callback
redirect_view = FacebookOAuthRedirect
name = "Facebook"
slug = "facebook"
verbose_name = "Facebook"
name = "facebook"
authorization_url = "https://www.facebook.com/v7.0/dialog/oauth"
access_token_url = "https://graph.facebook.com/v7.0/oauth/access_token" # nosec

View File

@ -68,8 +68,8 @@ class GitHubType(SourceType):
callback_view = GitHubOAuth2Callback
redirect_view = GitHubOAuthRedirect
name = "GitHub"
slug = "github"
verbose_name = "GitHub"
name = "github"
urls_customizable = True

View File

@ -34,8 +34,8 @@ class GoogleType(SourceType):
callback_view = GoogleOAuth2Callback
redirect_view = GoogleOAuthRedirect
name = "Google"
slug = "google"
verbose_name = "Google"
name = "google"
authorization_url = "https://accounts.google.com/o/oauth2/auth"
access_token_url = "https://oauth2.googleapis.com/token" # nosec

View File

@ -63,7 +63,7 @@ class MailcowType(SourceType):
callback_view = MailcowOAuth2Callback
redirect_view = MailcowOAuthRedirect
name = "Mailcow"
slug = "mailcow"
verbose_name = "Mailcow"
name = "mailcow"
urls_customizable = True

View File

@ -42,7 +42,7 @@ class OpenIDConnectType(SourceType):
callback_view = OpenIDConnectOAuth2Callback
redirect_view = OpenIDConnectOAuthRedirect
name = "OpenID Connect"
slug = "openidconnect"
verbose_name = "OpenID Connect"
name = "openidconnect"
urls_customizable = True

View File

@ -42,7 +42,7 @@ class OktaType(SourceType):
callback_view = OktaOAuth2Callback
redirect_view = OktaOAuthRedirect
name = "Okta"
slug = "okta"
verbose_name = "Okta"
name = "okta"
urls_customizable = True

View File

@ -43,8 +43,8 @@ class PatreonType(SourceType):
callback_view = PatreonOAuthCallback
redirect_view = PatreonOAuthRedirect
name = "Patreon"
slug = "patreon"
verbose_name = "Patreon"
name = "patreon"
authorization_url = "https://www.patreon.com/oauth2/authorize"
access_token_url = "https://www.patreon.com/api/oauth2/token" # nosec

View File

@ -51,8 +51,8 @@ class RedditType(SourceType):
callback_view = RedditOAuth2Callback
redirect_view = RedditOAuthRedirect
name = "Reddit"
slug = "reddit"
verbose_name = "Reddit"
name = "reddit"
authorization_url = "https://www.reddit.com/api/v1/authorize"
access_token_url = "https://www.reddit.com/api/v1/access_token" # nosec

View File

@ -28,7 +28,7 @@ class SourceType:
callback_view = OAuthCallback
redirect_view = OAuthRedirect
name: str = "default"
slug: str = "default"
verbose_name: str = "Default source type"
urls_customizable = False
@ -41,7 +41,7 @@ class SourceType:
def icon_url(self) -> str:
"""Get Icon URL for login"""
return static(f"authentik/sources/{self.slug}.svg")
return static(f"authentik/sources/{self.name}.svg")
def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge:
"""Allow types to return custom challenges"""
@ -77,20 +77,20 @@ class SourceTypeRegistry:
def get_name_tuple(self):
"""Get list of tuples of all registered names"""
return [(x.slug, x.name) for x in self.__sources]
return [(x.name, x.verbose_name) for x in self.__sources]
def find_type(self, type_name: str) -> Type[SourceType]:
"""Find type based on source"""
found_type = None
for src_type in self.__sources:
if src_type.slug == type_name:
if src_type.name == type_name:
return src_type
if not found_type:
found_type = SourceType
LOGGER.warning(
"no matching type found, using default",
wanted=type_name,
have=[x.slug for x in self.__sources],
have=[x.name for x in self.__sources],
)
return found_type

View File

@ -49,8 +49,8 @@ class TwitchType(SourceType):
callback_view = TwitchOAuth2Callback
redirect_view = TwitchOAuthRedirect
name = "Twitch"
slug = "twitch"
verbose_name = "Twitch"
name = "twitch"
authorization_url = "https://id.twitch.tv/oauth2/authorize"
access_token_url = "https://id.twitch.tv/oauth2/token" # nosec

View File

@ -66,8 +66,8 @@ class TwitterType(SourceType):
callback_view = TwitterOAuthCallback
redirect_view = TwitterOAuthRedirect
name = "Twitter"
slug = "twitter"
verbose_name = "Twitter"
name = "twitter"
authorization_url = "https://twitter.com/i/oauth2/authorize"
access_token_url = "https://api.twitter.com/2/oauth2/token" # nosec

1065
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -145,7 +145,12 @@ geoip2 = "*"
gunicorn = "*"
kubernetes = "*"
ldap3 = "*"
lxml = "*"
lxml = [
# 5.0.0 works with libxml2 2.11.x, which is standard on brew
{ version = "5.0.0", platform = "darwin" },
# 4.9.x works with previous libxml2 versions, which is what we get on linux
{ version = "4.9.4", platform = "linux" },
]
opencontainers = { extras = ["reggie"], version = "*" }
packaging = "*"
paramiko = "*"

View File

@ -1,8 +1,6 @@
"""LDAP and Outpost e2e tests"""
from dataclasses import asdict
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker.client import DockerClient, from_env
from docker.models.containers import Container
@ -14,13 +12,13 @@ from authentik.blueprints.tests import apply_blueprint, reconcile_app
from authentik.core.models import Application, User
from authentik.events.models import Event, EventAction
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id
from authentik.outposts.apps import MANAGED_OUTPOST
from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
from authentik.providers.ldap.models import APIAccessMode, LDAPProvider
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderLDAP(SeleniumTestCase):
"""LDAP and Outpost e2e tests"""
@ -37,7 +35,10 @@ class TestProviderLDAP(SeleniumTestCase):
container = client.containers.run(
image=self.get_container_image("ghcr.io/goauthentik/dev-ldap"),
detach=True,
network_mode="host",
ports={
"3389": "3389",
"6636": "6636",
},
environment={
"AUTHENTIK_HOST": self.live_server_url,
"AUTHENTIK_TOKEN": outpost.token.key,
@ -51,15 +52,15 @@ class TestProviderLDAP(SeleniumTestCase):
self.user.save()
ldap: LDAPProvider = LDAPProvider.objects.create(
name="ldap_provider",
name=generate_id(),
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
search_group=self.user.ak_groups.first(),
search_mode=APIAccessMode.CACHED,
)
# we need to create an application to actually access the ldap
Application.objects.create(name="ldap", slug="ldap", provider=ldap)
Application.objects.create(name=generate_id(), slug=generate_id(), provider=ldap)
outpost: Outpost = Outpost.objects.create(
name="ldap_outpost",
name=generate_id(),
type=OutpostType.LDAP,
_config=asdict(OutpostConfig(log_level="debug")),
)

View File

@ -1,8 +1,6 @@
"""test OAuth Provider flow"""
from sys import platform
from time import sleep
from typing import Any, Optional
from unittest.case import skipUnless
from docker.types import Healthcheck
from selenium.webdriver.common.by import By
@ -18,7 +16,6 @@ from authentik.providers.oauth2.models import ClientTypes, OAuth2Provider
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderOAuth2Github(SeleniumTestCase):
"""test OAuth Provider flow"""
@ -32,7 +29,9 @@ class TestProviderOAuth2Github(SeleniumTestCase):
return {
"image": "grafana/grafana:7.1.0",
"detach": True,
"network_mode": "host",
"ports": {
"3000": "3000",
},
"auto_remove": True,
"healthcheck": Healthcheck(
test=["CMD", "wget", "--spider", "http://localhost:3000"],

View File

@ -1,8 +1,6 @@
"""test OAuth2 OpenID Provider flow"""
from sys import platform
from time import sleep
from typing import Any, Optional
from unittest.case import skipUnless
from docker.types import Healthcheck
from selenium.webdriver.common.by import By
@ -24,7 +22,6 @@ from authentik.providers.oauth2.models import ClientTypes, OAuth2Provider, Scope
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderOAuth2OAuth(SeleniumTestCase):
"""test OAuth with OAuth Provider flow"""
@ -38,13 +35,15 @@ class TestProviderOAuth2OAuth(SeleniumTestCase):
return {
"image": "grafana/grafana:7.1.0",
"detach": True,
"network_mode": "host",
"auto_remove": True,
"healthcheck": Healthcheck(
test=["CMD", "wget", "--spider", "http://localhost:3000"],
interval=5 * 1_000 * 1_000_000,
start_period=1 * 1_000 * 1_000_000,
),
"ports": {
"3000": "3000",
},
"environment": {
"GF_AUTH_GENERIC_OAUTH_ENABLED": "true",
"GF_AUTH_GENERIC_OAUTH_CLIENT_ID": self.client_id,

View File

@ -1,8 +1,6 @@
"""test OAuth2 OpenID Provider flow"""
from json import loads
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker import DockerClient, from_env
from docker.models.containers import Container
@ -25,7 +23,6 @@ from authentik.providers.oauth2.models import ClientTypes, OAuth2Provider, Scope
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderOAuth2OIDC(SeleniumTestCase):
"""test OAuth with OpenID Provider flow"""
@ -36,13 +33,15 @@ class TestProviderOAuth2OIDC(SeleniumTestCase):
super().setUp()
def setup_client(self) -> Container:
"""Setup client saml-sp container which we test SAML against"""
"""Setup client oidc-test-client container which we test OIDC against"""
sleep(1)
client: DockerClient = from_env()
container = client.containers.run(
image="ghcr.io/beryju/oidc-test-client:1.3",
detach=True,
network_mode="host",
ports={
"9009": "9009",
},
environment={
"OIDC_CLIENT_ID": self.client_id,
"OIDC_CLIENT_SECRET": self.client_secret,

View File

@ -1,8 +1,6 @@
"""test OAuth2 OpenID Provider flow"""
from json import loads
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker import DockerClient, from_env
from docker.models.containers import Container
@ -25,7 +23,6 @@ from authentik.providers.oauth2.models import ClientTypes, OAuth2Provider, Scope
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderOAuth2OIDCImplicit(SeleniumTestCase):
"""test OAuth with OpenID Provider flow"""
@ -36,13 +33,15 @@ class TestProviderOAuth2OIDCImplicit(SeleniumTestCase):
super().setUp()
def setup_client(self) -> Container:
"""Setup client saml-sp container which we test SAML against"""
"""Setup client oidc-test-client container which we test OIDC against"""
sleep(1)
client: DockerClient = from_env()
container = client.containers.run(
image="ghcr.io/beryju/oidc-test-client:1.3",
detach=True,
network_mode="host",
ports={
"9009": "9009",
},
environment={
"OIDC_CLIENT_ID": self.client_id,
"OIDC_CLIENT_SECRET": self.client_secret,

View File

@ -21,7 +21,6 @@ from authentik.providers.proxy.models import ProxyProvider
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderProxy(SeleniumTestCase):
"""Proxy and Outpost e2e tests"""
@ -36,7 +35,9 @@ class TestProviderProxy(SeleniumTestCase):
return {
"image": "traefik/whoami:latest",
"detach": True,
"network_mode": "host",
"ports": {
"80": "80",
},
"auto_remove": True,
}
@ -46,7 +47,9 @@ class TestProviderProxy(SeleniumTestCase):
container = client.containers.run(
image=self.get_container_image("ghcr.io/goauthentik/dev-proxy"),
detach=True,
network_mode="host",
ports={
"9000": "9000",
},
environment={
"AUTHENTIK_HOST": self.live_server_url,
"AUTHENTIK_TOKEN": outpost.token.key,
@ -78,7 +81,7 @@ class TestProviderProxy(SeleniumTestCase):
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
internal_host="http://localhost",
internal_host=f"http://{self.host}",
external_host="http://localhost:9000",
)
# Ensure OAuth2 Params are set
@ -145,7 +148,7 @@ class TestProviderProxy(SeleniumTestCase):
authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
),
internal_host="http://localhost",
internal_host=f"http://{self.host}",
external_host="http://localhost:9000",
basic_auth_enabled=True,
basic_auth_user_attribute="basic-username",

View File

@ -1,8 +1,6 @@
"""Radius e2e tests"""
from dataclasses import asdict
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker.client import DockerClient, from_env
from docker.models.containers import Container
@ -19,7 +17,6 @@ from authentik.providers.radius.models import RadiusProvider
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderRadius(SeleniumTestCase):
"""Radius Outpost e2e tests"""
@ -40,7 +37,7 @@ class TestProviderRadius(SeleniumTestCase):
container = client.containers.run(
image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
detach=True,
network_mode="host",
ports={"1812/udp": "1812/udp"},
environment={
"AUTHENTIK_HOST": self.live_server_url,
"AUTHENTIK_TOKEN": outpost.token.key,

View File

@ -1,8 +1,6 @@
"""test SAML Provider flow"""
from json import loads
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker import DockerClient, from_env
from docker.models.containers import Container
@ -20,7 +18,6 @@ from authentik.sources.saml.processors.constants import SAML_BINDING_POST
from tests.e2e.utils import SeleniumTestCase, retry
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderSAML(SeleniumTestCase):
"""test SAML Provider flow"""
@ -41,7 +38,9 @@ class TestProviderSAML(SeleniumTestCase):
container = client.containers.run(
image="ghcr.io/beryju/saml-test-sp:1.1",
detach=True,
network_mode="host",
ports={
"9009": "9009",
},
environment={
"SP_ENTITY_ID": provider.issuer,
"SP_SSO_BINDING": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST",

View File

@ -0,0 +1,141 @@
"""test OAuth Source"""
from time import sleep
from typing import Any, Optional
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.support.wait import WebDriverWait
from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import User
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id, generate_key
from authentik.sources.oauth.models import OAuthSource
from authentik.sources.oauth.types.registry import SourceType, registry
from authentik.sources.oauth.views.callback import OAuthCallback
from authentik.stages.identification.models import IdentificationStage
from tests.e2e.utils import SeleniumTestCase, retry
class OAuth1Callback(OAuthCallback):
"""OAuth1 Callback with custom getters"""
def get_user_id(self, info: dict[str, str]) -> str:
return info.get("id")
def get_user_enroll_context(
self,
info: dict[str, Any],
) -> dict[str, Any]:
return {
"username": info.get("screen_name"),
"email": info.get("email"),
"name": info.get("name"),
}
@registry.register()
class OAUth1Type(SourceType):
"""OAuth1 Type definition"""
callback_view = OAuth1Callback
verbose_name = "OAuth1"
name = "oauth1"
request_token_url = "http://localhost:5001/oauth/request_token" # nosec
access_token_url = "http://localhost:5001/oauth/access_token" # nosec
authorization_url = "http://localhost:5001/oauth/authorize"
profile_url = "http://localhost:5001/api/me"
urls_customizable = False
class TestSourceOAuth1(SeleniumTestCase):
"""Test OAuth1 Source"""
def setUp(self) -> None:
self.client_id = generate_id()
self.client_secret = generate_key()
self.source_slug = generate_id()
super().setUp()
def get_container_specs(self) -> Optional[dict[str, Any]]:
return {
"image": "ghcr.io/beryju/oauth1-test-server:v1.1",
"detach": True,
"ports": {"5000": "5001"},
"auto_remove": True,
"environment": {
"OAUTH1_CLIENT_ID": self.client_id,
"OAUTH1_CLIENT_SECRET": self.client_secret,
"OAUTH1_REDIRECT_URI": self.url(
"authentik_sources_oauth:oauth-client-callback",
source_slug=self.source_slug,
),
},
}
def create_objects(self):
"""Create required objects"""
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
source = OAuthSource.objects.create( # nosec
name=generate_id(),
slug=self.source_slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="oauth1",
consumer_key=self.client_id,
consumer_secret=self.client_secret,
)
ident_stage = IdentificationStage.objects.first()
ident_stage.sources.set([source])
ident_stage.save()
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/flow-default-source-authentication.yaml",
"default/flow-default-source-enrollment.yaml",
"default/flow-default-source-pre-authentication.yaml",
)
def test_oauth_enroll(self):
"""test OAuth Source With With OIDC"""
self.create_objects()
self.driver.get(self.live_server_url)
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
ec.presence_of_element_located(
(By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button")
)
)
identification_stage.find_element(
By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.NAME, "username")))
self.driver.find_element(By.NAME, "username").send_keys("example-user")
self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER)
sleep(2)
# Wait until we're logged in
self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']")))
self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click()
# Wait until we've loaded the user info page
sleep(2)
# Wait until we've logged in
self.wait_for_url(self.if_user_url("/library"))
self.driver.get(self.if_user_url("/settings"))
self.assert_user(User(username="example-user", name="test name", email="foo@example.com"))

View File

@ -1,9 +1,7 @@
"""test OAuth Source"""
from pathlib import Path
from sys import platform
from time import sleep
from typing import Any, Optional
from unittest.case import skipUnless
from docker.models.containers import Container
from docker.types import Healthcheck
@ -18,47 +16,12 @@ from authentik.core.models import User
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id, generate_key
from authentik.sources.oauth.models import OAuthSource
from authentik.sources.oauth.types.registry import SourceType, registry
from authentik.sources.oauth.views.callback import OAuthCallback
from authentik.stages.identification.models import IdentificationStage
from tests.e2e.utils import SeleniumTestCase, retry
CONFIG_PATH = "/tmp/dex.yml" # nosec
class OAuth1Callback(OAuthCallback):
"""OAuth1 Callback with custom getters"""
def get_user_id(self, info: dict[str, str]) -> str:
return info.get("id")
def get_user_enroll_context(
self,
info: dict[str, Any],
) -> dict[str, Any]:
return {
"username": info.get("screen_name"),
"email": info.get("email"),
"name": info.get("name"),
}
@registry.register()
class OAUth1Type(SourceType):
"""OAuth1 Type definition"""
callback_view = OAuth1Callback
name = "OAuth1"
slug = "oauth1"
request_token_url = "http://localhost:5000/oauth/request_token" # nosec
access_token_url = "http://localhost:5000/oauth/access_token" # nosec
authorization_url = "http://localhost:5000/oauth/authorize"
profile_url = "http://localhost:5000/api/me"
urls_customizable = False
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestSourceOAuth2(SeleniumTestCase):
"""test OAuth Source flow"""
@ -66,6 +29,7 @@ class TestSourceOAuth2(SeleniumTestCase):
def setUp(self):
self.client_secret = generate_key()
self.slug = generate_id()
self.prepare_dex_config()
super().setUp()
@ -83,7 +47,7 @@ class TestSourceOAuth2(SeleniumTestCase):
"redirectURIs": [
self.url(
"authentik_sources_oauth:oauth-client-callback",
source_slug="dex",
source_slug=self.slug,
)
],
"secret": self.client_secret,
@ -108,7 +72,7 @@ class TestSourceOAuth2(SeleniumTestCase):
return {
"image": "ghcr.io/dexidp/dex:v2.28.1",
"detach": True,
"network_mode": "host",
"ports": {"5556": "5556"},
"auto_remove": True,
"command": "dex serve /config.yml",
"healthcheck": Healthcheck(
@ -126,8 +90,8 @@ class TestSourceOAuth2(SeleniumTestCase):
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
source = OAuthSource.objects.create( # nosec
name="dex",
slug="dex",
name=generate_id(),
slug=self.slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="openidconnect",
@ -229,95 +193,3 @@ class TestSourceOAuth2(SeleniumTestCase):
self.driver.get(self.if_user_url("/settings"))
self.assert_user(User(username="foo", name="admin", email="admin@example.com"))
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestSourceOAuth1(SeleniumTestCase):
"""Test OAuth1 Source"""
def setUp(self) -> None:
self.client_id = generate_id()
self.client_secret = generate_key()
self.source_slug = "oauth1-test"
super().setUp()
def get_container_specs(self) -> Optional[dict[str, Any]]:
return {
"image": "ghcr.io/beryju/oauth1-test-server:v1.1",
"detach": True,
"network_mode": "host",
"auto_remove": True,
"environment": {
"OAUTH1_CLIENT_ID": self.client_id,
"OAUTH1_CLIENT_SECRET": self.client_secret,
"OAUTH1_REDIRECT_URI": self.url(
"authentik_sources_oauth:oauth-client-callback",
source_slug=self.source_slug,
),
},
}
def create_objects(self):
"""Create required objects"""
# Bootstrap all needed objects
authentication_flow = Flow.objects.get(slug="default-source-authentication")
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
source = OAuthSource.objects.create( # nosec
name="oauth1",
slug=self.source_slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
provider_type="oauth1",
consumer_key=self.client_id,
consumer_secret=self.client_secret,
)
ident_stage = IdentificationStage.objects.first()
ident_stage.sources.set([source])
ident_stage.save()
@retry()
@apply_blueprint(
"default/flow-default-authentication-flow.yaml",
"default/flow-default-invalidation-flow.yaml",
)
@apply_blueprint(
"default/flow-default-source-authentication.yaml",
"default/flow-default-source-enrollment.yaml",
"default/flow-default-source-pre-authentication.yaml",
)
def test_oauth_enroll(self):
"""test OAuth Source With With OIDC"""
self.create_objects()
self.driver.get(self.live_server_url)
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor)
wait = WebDriverWait(identification_stage, self.wait_timeout)
wait.until(
ec.presence_of_element_located(
(By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button")
)
)
identification_stage.find_element(
By.CSS_SELECTOR, ".pf-c-login__main-footer-links-item > button"
).click()
# Now we should be at the IDP, wait for the login field
self.wait.until(ec.presence_of_element_located((By.NAME, "username")))
self.driver.find_element(By.NAME, "username").send_keys("example-user")
self.driver.find_element(By.NAME, "username").send_keys(Keys.ENTER)
sleep(2)
# Wait until we're logged in
self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "[name='confirm']")))
self.driver.find_element(By.CSS_SELECTOR, "[name='confirm']").click()
# Wait until we've loaded the user info page
sleep(2)
# Wait until we've logged in
self.wait_for_url(self.if_user_url("/library"))
self.driver.get(self.if_user_url("/settings"))
self.assert_user(User(username="example-user", name="test name", email="foo@example.com"))

View File

@ -1,8 +1,6 @@
"""test SAML Source"""
from sys import platform
from time import sleep
from typing import Any, Optional
from unittest.case import skipUnless
from docker.types import Healthcheck
from guardian.utils import get_anonymous_user
@ -15,6 +13,7 @@ from authentik.blueprints.tests import apply_blueprint
from authentik.core.models import User
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow
from authentik.lib.generators import generate_id
from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource
from authentik.stages.identification.models import IdentificationStage
from tests.e2e.utils import SeleniumTestCase, retry
@ -71,15 +70,18 @@ Sm75WXsflOxuTn08LbgGc4s=
-----END PRIVATE KEY-----"""
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestSourceSAML(SeleniumTestCase):
"""test SAML Source flow"""
def setUp(self):
self.slug = generate_id()
super().setUp()
def get_container_specs(self) -> Optional[dict[str, Any]]:
return {
"image": "kristophjunge/test-saml-idp:1.15",
"detach": True,
"network_mode": "host",
"ports": {"8080": "8080"},
"auto_remove": True,
"healthcheck": Healthcheck(
test=["CMD", "curl", "http://localhost:8080"],
@ -89,7 +91,7 @@ class TestSourceSAML(SeleniumTestCase):
"environment": {
"SIMPLESAMLPHP_SP_ENTITY_ID": "entity-id",
"SIMPLESAMLPHP_SP_ASSERTION_CONSUMER_SERVICE": (
f"{self.live_server_url}/source/saml/saml-idp-test/acs/"
self.url("authentik_sources_saml:acs", source_slug=self.slug)
),
},
}
@ -111,19 +113,19 @@ class TestSourceSAML(SeleniumTestCase):
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
pre_authentication_flow = Flow.objects.get(slug="default-source-pre-authentication")
keypair = CertificateKeyPair.objects.create(
name="test-idp-cert",
name=generate_id(),
certificate_data=IDP_CERT,
key_data=IDP_KEY,
)
source = SAMLSource.objects.create(
name="saml-idp-test",
slug="saml-idp-test",
name=generate_id(),
slug=self.slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
pre_authentication_flow=pre_authentication_flow,
issuer="entity-id",
sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php",
sso_url=f"http://{self.host}:8080/simplesaml/saml2/idp/SSOService.php",
binding_type=SAMLBindingTypes.REDIRECT,
signing_kp=keypair,
)
@ -181,19 +183,19 @@ class TestSourceSAML(SeleniumTestCase):
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
pre_authentication_flow = Flow.objects.get(slug="default-source-pre-authentication")
keypair = CertificateKeyPair.objects.create(
name="test-idp-cert",
name=generate_id(),
certificate_data=IDP_CERT,
key_data=IDP_KEY,
)
source = SAMLSource.objects.create(
name="saml-idp-test",
slug="saml-idp-test",
name=generate_id(),
slug=self.slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
pre_authentication_flow=pre_authentication_flow,
issuer="entity-id",
sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php",
sso_url=f"http://{self.host}:8080/simplesaml/saml2/idp/SSOService.php",
binding_type=SAMLBindingTypes.POST,
signing_kp=keypair,
)
@ -264,19 +266,19 @@ class TestSourceSAML(SeleniumTestCase):
enrollment_flow = Flow.objects.get(slug="default-source-enrollment")
pre_authentication_flow = Flow.objects.get(slug="default-source-pre-authentication")
keypair = CertificateKeyPair.objects.create(
name="test-idp-cert",
name=generate_id(),
certificate_data=IDP_CERT,
key_data=IDP_KEY,
)
source = SAMLSource.objects.create(
name="saml-idp-test",
slug="saml-idp-test",
name=generate_id(),
slug=self.slug,
authentication_flow=authentication_flow,
enrollment_flow=enrollment_flow,
pre_authentication_flow=pre_authentication_flow,
issuer="entity-id",
sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php",
sso_url=f"http://{self.host}:8080/simplesaml/saml2/idp/SSOService.php",
binding_type=SAMLBindingTypes.POST_AUTO,
signing_kp=keypair,
)

View File

@ -1,6 +1,7 @@
"""authentik e2e testing utilities"""
import json
import os
import socket
from functools import lru_cache, wraps
from os import environ
from sys import stderr
@ -43,6 +44,13 @@ def get_docker_tag() -> str:
return f"gh-{branch_name}"
def get_local_ip() -> str:
"""Get the local machine's IP"""
hostname = socket.gethostname()
ip_addr = socket.gethostbyname(hostname)
return ip_addr
class DockerTestCase:
"""Mixin for dealing with containers"""
@ -63,6 +71,7 @@ class DockerTestCase:
class SeleniumTestCase(DockerTestCase, StaticLiveServerTestCase):
"""StaticLiveServerTestCase which automatically creates a Webdriver instance"""
host = get_local_ip()
container: Optional[Container] = None
wait_timeout: int
user: User