121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
"""Radius e2e tests"""
|
|
from dataclasses import asdict
|
|
from time import sleep
|
|
|
|
from docker.client import DockerClient, from_env
|
|
from docker.models.containers import Container
|
|
from pyrad.client import Client
|
|
from pyrad.dictionary import Dictionary
|
|
from pyrad.packet import AccessAccept, AccessReject, AccessRequest
|
|
|
|
from authentik.blueprints.tests import apply_blueprint
|
|
from authentik.core.models import Application, User
|
|
from authentik.flows.models import Flow
|
|
from authentik.lib.generators import generate_id, generate_key
|
|
from authentik.outposts.models import Outpost, OutpostConfig, OutpostType
|
|
from authentik.providers.radius.models import RadiusProvider
|
|
from tests.e2e.utils import SeleniumTestCase, retry
|
|
|
|
|
|
class TestProviderRadius(SeleniumTestCase):
|
|
"""Radius Outpost e2e tests"""
|
|
|
|
radius_container: Container
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.shared_secret = generate_key()
|
|
|
|
def tearDown(self) -> None:
|
|
super().tearDown()
|
|
self.output_container_logs(self.radius_container)
|
|
self.radius_container.kill()
|
|
|
|
def start_radius(self, outpost: Outpost) -> Container:
|
|
"""Start radius container based on outpost created"""
|
|
client: DockerClient = from_env()
|
|
container = client.containers.run(
|
|
image=self.get_container_image("ghcr.io/goauthentik/dev-radius"),
|
|
detach=True,
|
|
ports={"1812/udp": "1812/udp"},
|
|
environment={
|
|
"AUTHENTIK_HOST": self.live_server_url,
|
|
"AUTHENTIK_TOKEN": outpost.token.key,
|
|
},
|
|
)
|
|
return container
|
|
|
|
def _prepare(self) -> User:
|
|
"""prepare user, provider, app and container"""
|
|
radius: RadiusProvider = RadiusProvider.objects.create(
|
|
name=generate_id(),
|
|
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
|
|
shared_secret=self.shared_secret,
|
|
)
|
|
# we need to create an application to actually access radius
|
|
Application.objects.create(name="radius", slug=generate_id(), provider=radius)
|
|
outpost: Outpost = Outpost.objects.create(
|
|
name=generate_id(),
|
|
type=OutpostType.RADIUS,
|
|
_config=asdict(OutpostConfig(log_level="debug")),
|
|
)
|
|
outpost.providers.add(radius)
|
|
|
|
self.radius_container = self.start_radius(outpost)
|
|
|
|
# Wait until outpost healthcheck succeeds
|
|
healthcheck_retries = 0
|
|
while healthcheck_retries < 50:
|
|
if len(outpost.state) > 0:
|
|
state = outpost.state[0]
|
|
if state.last_seen:
|
|
break
|
|
healthcheck_retries += 1
|
|
sleep(0.5)
|
|
sleep(5)
|
|
return outpost
|
|
|
|
@retry()
|
|
@apply_blueprint(
|
|
"default/flow-default-authentication-flow.yaml",
|
|
"default/flow-default-invalidation-flow.yaml",
|
|
)
|
|
def test_radius_bind_success(self):
|
|
"""Test simple bind"""
|
|
self._prepare()
|
|
srv = Client(
|
|
server="localhost",
|
|
secret=self.shared_secret.encode(),
|
|
dict=Dictionary("tests/radius-dictionary"),
|
|
)
|
|
|
|
req = srv.CreateAuthPacket(
|
|
code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
|
|
)
|
|
req["User-Password"] = req.PwCrypt(self.user.username)
|
|
|
|
reply = srv.SendPacket(req)
|
|
self.assertEqual(reply.code, AccessAccept)
|
|
|
|
@retry()
|
|
@apply_blueprint(
|
|
"default/flow-default-authentication-flow.yaml",
|
|
"default/flow-default-invalidation-flow.yaml",
|
|
)
|
|
def test_radius_bind_fail(self):
|
|
"""Test simple bind (failed)"""
|
|
self._prepare()
|
|
srv = Client(
|
|
server="localhost",
|
|
secret=self.shared_secret.encode(),
|
|
dict=Dictionary("tests/radius-dictionary"),
|
|
)
|
|
|
|
req = srv.CreateAuthPacket(
|
|
code=AccessRequest, User_Name=self.user.username, NAS_Identifier="localhost"
|
|
)
|
|
req["User-Password"] = req.PwCrypt(self.user.username + "foo")
|
|
|
|
reply = srv.SendPacket(req)
|
|
self.assertEqual(reply.code, AccessReject)
|