stages/authenticator: vendor otp (#6741)

* initial import

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* update imports

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove email and hotp for now

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove things we don't need and clean up

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* initial merge static

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* initial merge totp

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* more fixes

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix migrations

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* update webui

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add system migration

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* more cleanup, add doctests to test_runner

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* more cleanup

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fixup more lint

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* cleanup last tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* update docstrings

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* implement SerializerModel

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix web format

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L 2023-09-04 11:45:14 +02:00 committed by GitHub
parent 3f12c7c013
commit 6612f729ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
45 changed files with 2046 additions and 107 deletions

View file

@ -1,6 +1,4 @@
"""Authenticator Devices API Views"""
from django_otp import device_classes, devices_for_user
from django_otp.models import Device
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework.fields import BooleanField, CharField, IntegerField, SerializerMethodField
@ -10,6 +8,8 @@ from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from authentik.core.api.utils import MetaNameSerializer
from authentik.stages.authenticator import device_classes, devices_for_user
from authentik.stages.authenticator.models import Device
class DeviceSerializer(MetaNameSerializer):

View file

@ -25,10 +25,10 @@ def create_test_admin_user(name: Optional[str] = None, **kwargs) -> User:
"""Generate a test-admin user"""
uid = generate_id(20) if not name else name
group = Group.objects.create(name=uid, is_superuser=True)
kwargs.setdefault("email", f"{uid}@goauthentik.io")
kwargs.setdefault("username", uid)
user: User = User.objects.create(
username=uid,
name=uid,
email=f"{uid}@goauthentik.io",
**kwargs,
)
user.set_password(uid)

View file

@ -1,44 +1,30 @@
"""Enterprise license policies"""
from typing import Optional
from rest_framework.serializers import BaseSerializer
from authentik.core.models import User, UserTypes
from authentik.enterprise.models import LicenseKey
from authentik.policies.models import Policy
from authentik.policies.types import PolicyRequest, PolicyResult
from authentik.policies.views import PolicyAccessView
class EnterprisePolicy(Policy):
"""Check that a user is correctly licensed for the request"""
@property
def component(self) -> str:
return ""
@property
def serializer(self) -> type[BaseSerializer]:
raise NotImplementedError
def passes(self, request: PolicyRequest) -> PolicyResult:
if not LicenseKey.get_total().is_valid():
return PolicyResult(False)
if request.user.type != UserTypes.INTERNAL:
return PolicyResult(False)
return PolicyResult(True)
class EnterprisePolicyAccessView(PolicyAccessView):
"""PolicyAccessView which also checks enterprise licensing"""
def check_license(self):
"""Check license"""
if not LicenseKey.get_total().is_valid():
return False
if self.request.user.type != UserTypes.INTERNAL:
return False
return True
def user_has_access(self, user: Optional[User] = None) -> PolicyResult:
user = user or self.request.user
request = PolicyRequest(user)
request.http_request = self.request
result = super().user_has_access(user)
enterprise_result = EnterprisePolicy().passes(request)
if not enterprise_result.passing:
enterprise_result = self.check_license()
if not enterprise_result:
return enterprise_result
return result

View file

@ -9,7 +9,6 @@ from django.core.exceptions import SuspiciousOperation
from django.db.models import Model
from django.db.models.signals import m2m_changed, post_save, pre_delete
from django.http import HttpRequest, HttpResponse
from django_otp.plugins.otp_static.models import StaticToken
from guardian.models import UserObjectPermission
from authentik.core.models import (
@ -30,6 +29,7 @@ from authentik.outposts.models import OutpostServiceConnection
from authentik.policies.models import Policy, PolicyBindingModel
from authentik.providers.oauth2.models import AccessToken, AuthorizationCode, RefreshToken
from authentik.providers.scim.models import SCIMGroup, SCIMUser
from authentik.stages.authenticator_static.models import StaticToken
IGNORED_MODELS = (
Event,

View file

@ -7,7 +7,6 @@ from typing import Any, Iterable, Optional
from cachetools import TLRUCache, cached
from django.core.exceptions import FieldError
from django_otp import devices_for_user
from guardian.shortcuts import get_anonymous_user
from rest_framework.serializers import ValidationError
from sentry_sdk.hub import Hub
@ -20,6 +19,7 @@ from authentik.lib.utils.http import get_http_session
from authentik.policies.models import Policy, PolicyBinding
from authentik.policies.process import PolicyProcess
from authentik.policies.types import PolicyRequest, PolicyResult
from authentik.stages.authenticator import devices_for_user
LOGGER = get_logger()

View file

@ -81,6 +81,7 @@ INSTALLED_APPS = [
"authentik.sources.oauth",
"authentik.sources.plex",
"authentik.sources.saml",
"authentik.stages.authenticator",
"authentik.stages.authenticator_duo",
"authentik.stages.authenticator_sms",
"authentik.stages.authenticator_static",

View file

@ -20,7 +20,7 @@ class PytestTestRunner: # pragma: no cover
self.failfast = failfast
self.keepdb = keepdb
self.args = ["-vv", "--full-trace"]
self.args = []
if self.failfast:
self.args.append("--exitfirst")
if self.keepdb:

View file

@ -0,0 +1,129 @@
"""Authenticator devices helpers"""
from django.db import transaction
def verify_token(user, device_id, token):
"""
Attempts to verify a :term:`token` against a specific device, identified by
:attr:`~authentik.stages.authenticator.models.Device.persistent_id`.
This wraps the verification process in a transaction to ensure that things
like throttling polices are properly enforced.
:param user: The user supplying the token.
:type user: :class:`~django.contrib.auth.models.User`
:param str device_id: A device's persistent_id value.
:param str token: An OTP token to verify.
:returns: The device that accepted ``token``, if any.
:rtype: :class:`~authentik.stages.authenticator.models.Device` or ``None``
"""
from authentik.stages.authenticator.models import Device
verified = None
with transaction.atomic():
device = Device.from_persistent_id(device_id, for_verify=True)
if (device is not None) and (device.user_id == user.pk) and device.verify_token(token):
verified = device
return verified
def match_token(user, token):
"""
Attempts to verify a :term:`token` on every device attached to the given
user until one of them succeeds.
.. warning::
This originally existed for more convenient integration with the admin
site. Its use is no longer recommended and it is not guaranteed to
interact well with more recent features (such as throttling). Tokens
should always be verified against specific devices.
:param user: The user supplying the token.
:type user: :class:`~django.contrib.auth.models.User`
:param str token: An OTP token to verify.
:returns: The device that accepted ``token``, if any.
:rtype: :class:`~authentik.stages.authenticator.models.Device` or ``None``
"""
with transaction.atomic():
for device in devices_for_user(user, for_verify=True):
if device.verify_token(token):
break
else:
device = None
return device
def devices_for_user(user, confirmed=True, for_verify=False):
"""
Return an iterable of all devices registered to the given user.
Returns an empty iterable for anonymous users.
:param user: standard or custom user object.
:type user: :class:`~django.contrib.auth.models.User`
:param bool confirmed: If ``None``, all matching devices are returned.
Otherwise, this can be any true or false value to limit the query
to confirmed or unconfirmed devices, respectively.
:param bool for_verify: If ``True``, we'll load the devices with
:meth:`~django.db.models.query.QuerySet.select_for_update` to prevent
concurrent verifications from succeeding. In which case, this must be
called inside a transaction.
:rtype: iterable
"""
if user.is_anonymous:
return
for model in device_classes():
device_set = model.objects.devices_for_user(user, confirmed=confirmed)
if for_verify:
device_set = device_set.select_for_update()
yield from device_set
def user_has_device(user, confirmed=True):
"""
Return ``True`` if the user has at least one device.
Returns ``False`` for anonymous users.
:param user: standard or custom user object.
:type user: :class:`~django.contrib.auth.models.User`
:param confirmed: If ``None``, all matching devices are considered.
Otherwise, this can be any true or false value to limit the query
to confirmed or unconfirmed devices, respectively.
"""
try:
next(devices_for_user(user, confirmed=confirmed))
except StopIteration:
has_device = False
else:
has_device = True
return has_device
def device_classes():
"""
Returns an iterable of all loaded device models.
"""
from django.apps import apps # isort: skip
from authentik.stages.authenticator.models import Device
for config in apps.get_app_configs():
for model in config.get_models():
if issubclass(model, Device):
yield model

View file

@ -0,0 +1,10 @@
"""Authenticator"""
from django.apps import AppConfig
class AuthentikStageAuthenticatorConfig(AppConfig):
"""Authenticator App config"""
name = "authentik.stages.authenticator"
label = "authentik_stages_authenticator"
verbose_name = "authentik Stages.Authenticator"

View file

@ -0,0 +1,401 @@
"""Base authenticator models"""
from datetime import timedelta
from django.apps import apps
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.functional import cached_property
from authentik.core.models import User
from authentik.stages.authenticator.util import random_number_token
class DeviceManager(models.Manager):
"""
The :class:`~django.db.models.Manager` object installed as
``Device.objects``.
"""
def devices_for_user(self, user, confirmed=None):
"""
Returns a queryset for all devices of this class that belong to the
given user.
:param user: The user.
:type user: :class:`~django.contrib.auth.models.User`
:param confirmed: If ``None``, all matching devices are returned.
Otherwise, this can be any true or false value to limit the query
to confirmed or unconfirmed devices, respectively.
"""
devices = self.model.objects.filter(user=user)
if confirmed is not None:
devices = devices.filter(confirmed=bool(confirmed))
return devices
class Device(models.Model):
"""
Abstract base model for a :term:`device` attached to a user. Plugins must
subclass this to define their OTP models.
.. _unsaved_device_warning:
.. warning::
OTP devices are inherently stateful. For example, verifying a token is
logically a mutating operation on the device, which may involve
incrementing a counter or otherwise consuming a token. A device must be
committed to the database before it can be used in any way.
.. attribute:: user
*ForeignKey*: Foreign key to your user model, as configured by
:setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
by default).
.. attribute:: name
*CharField*: A human-readable name to help the user identify their
devices.
.. attribute:: confirmed
*BooleanField*: A boolean value that tells us whether this device has
been confirmed as valid. It defaults to ``True``, but subclasses or
individual deployments can force it to ``False`` if they wish to create
a device and then ask the user for confirmation. As a rule, built-in
APIs that enumerate devices will only include those that are confirmed.
.. attribute:: objects
A :class:`~authentik.stages.authenticator.models.DeviceManager`.
"""
user = models.ForeignKey(
User,
help_text="The user that this device belongs to.",
on_delete=models.CASCADE,
)
name = models.CharField(max_length=64, help_text="The human-readable name of this device.")
confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?")
objects = DeviceManager()
class Meta:
abstract = True
def __str__(self):
try:
user = self.user
except ObjectDoesNotExist:
user = None
return "{0} ({1})".format(self.name, user)
@property
def persistent_id(self):
"""
A stable device identifier for forms and APIs.
"""
return "{0}/{1}".format(self.model_label(), self.id)
@classmethod
def model_label(cls):
"""
Returns an identifier for this Django model class.
This is just the standard "<app_label>.<model_name>" form.
"""
return "{0}.{1}".format(cls._meta.app_label, cls._meta.model_name)
@classmethod
def from_persistent_id(cls, persistent_id, for_verify=False):
"""
Loads a device from its persistent id::
device == Device.from_persistent_id(device.persistent_id)
:param bool for_verify: If ``True``, we'll load the device with
:meth:`~django.db.models.query.QuerySet.select_for_update` to
prevent concurrent verifications from succeeding. In which case,
this must be called inside a transaction.
"""
device = None
try:
model_label, device_id = persistent_id.rsplit("/", 1)
app_label, model_name = model_label.split(".")
device_cls = apps.get_model(app_label, model_name)
if issubclass(device_cls, Device):
device_set = device_cls.objects.filter(id=int(device_id))
if for_verify:
device_set = device_set.select_for_update()
device = device_set.first()
except (ValueError, LookupError):
pass
return device
def is_interactive(self):
"""
Returns ``True`` if this is an interactive device. The default
implementation returns ``True`` if
:meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been
overridden, but subclasses are welcome to provide smarter
implementations.
:rtype: bool
"""
return not hasattr(self.generate_challenge, "stub")
def generate_challenge(self):
"""
Generates a challenge value that the user will need to produce a token.
This method is permitted to have side effects, such as transmitting
information to the user through some other channel (email or SMS,
perhaps). And, of course, some devices may need to commit the
challenge to the database.
:returns: A message to the user. This should be a string that fits
comfortably in the template ``'OTP Challenge: {0}'``. This may
return ``None`` if this device is not interactive.
:rtype: string or ``None``
:raises: Any :exc:`~exceptions.Exception` is permitted. Callers should
trap ``Exception`` and report it to the user.
"""
return None
generate_challenge.stub = True
def verify_is_allowed(self):
"""
Checks whether it is permissible to call :meth:`verify_token`. If it is
allowed, returns ``(True, None)``. Otherwise returns ``(False,
data_dict)``, where ``data_dict`` contains extra information, defined
by the implementation.
This method can be used to implement throttling or locking, for
example. Client code should check this method before calling
:meth:`verify_token` and report problems to the user.
To report specific problems, the data dictionary can return include a
``'reason'`` member with a value from the constants in
:class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member
should be provided with an error message.
:meth:`verify_token` should also call this method and return False if
verification is not allowed.
:rtype: (bool, dict or ``None``)
"""
return (True, None)
def verify_token(self, token):
"""
Verifies a token. As a rule, the token should no longer be valid if
this returns ``True``.
:param str token: The OTP token provided by the user.
:rtype: bool
"""
return False
class SideChannelDevice(Device):
"""
Abstract base model for a side-channel :term:`device` attached to a user.
This model implements token generation, verification and expiration, so the
concrete devices only have to implement delivery.
"""
token = models.CharField(max_length=16, blank=True, null=True)
valid_until = models.DateTimeField(
default=timezone.now,
help_text="The timestamp of the moment of expiry of the saved token.",
)
class Meta:
abstract = True
def generate_token(self, length=6, valid_secs=300, commit=True):
"""
Generates a token of the specified length, then sets it on the model
and sets the expiration of the token on the model.
Pass 'commit=False' to avoid calling self.save().
:param int length: Number of decimal digits in the generated token.
:param int valid_secs: Amount of seconds the token should be valid.
:param bool commit: Whether to autosave the generated token.
"""
self.token = random_number_token(length)
self.valid_until = timezone.now() + timedelta(seconds=valid_secs)
if commit:
self.save()
def verify_token(self, token):
"""
Verifies a token by content and expiry.
On success, the token is cleared and the device saved.
:param str token: The OTP token provided by the user.
:rtype: bool
"""
_now = timezone.now()
if (self.token is not None) and (token == self.token) and (_now < self.valid_until):
self.token = None
self.valid_until = _now
self.save()
return True
return False
class VerifyNotAllowed:
"""
Constants that may be returned in the ``reason`` member of the extra
information dictionary returned by
:meth:`~authentik.stages.authenticator.models.Device.verify_is_allowed`
.. data:: N_FAILED_ATTEMPTS
Indicates that verification is disallowed because of ``n`` successive
failed attempts. The data dictionary should include the value of ``n``
in member ``failure_count``
"""
N_FAILED_ATTEMPTS = "N_FAILED_ATTEMPTS"
class ThrottlingMixin(models.Model):
"""
Mixin class for models that want throttling behaviour.
This implements exponential back-off for verifying tokens. Subclasses must
implement :meth:`get_throttle_factor`, and must use the
:meth:`verify_is_allowed`, :meth:`throttle_reset` and
:meth:`throttle_increment` methods from within their verify_token() method.
See the implementation of
:class:`~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice` for an example.
"""
throttling_failure_timestamp = models.DateTimeField(
null=True,
blank=True,
default=None,
help_text=(
"A timestamp of the last failed verification attempt. "
"Null if last attempt succeeded."
),
)
throttling_failure_count = models.PositiveIntegerField(
default=0, help_text="Number of successive failed attempts."
)
def verify_is_allowed(self):
"""
If verification is allowed, returns ``(True, None)``.
Otherwise, returns ``(False, data_dict)``.
``data_dict`` contains further information. Currently it can be::
{
'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
'failure_count': n
}
where ``n`` is the number of successive failures. See
:class:`~authentik.stages.authenticator.models.VerifyNotAllowed`.
"""
if (
self.throttling_enabled
and self.throttling_failure_count > 0
and self.throttling_failure_timestamp is not None
):
now = timezone.now()
delay = (now - self.throttling_failure_timestamp).total_seconds()
# Required delays should be 1, 2, 4, 8 ...
delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1))
if delay < delay_required:
return (
False,
{
"reason": VerifyNotAllowed.N_FAILED_ATTEMPTS,
"failure_count": self.throttling_failure_count,
"locked_until": self.throttling_failure_timestamp
+ timedelta(seconds=delay_required),
},
)
return super().verify_is_allowed()
def throttle_reset(self, commit=True):
"""
Call this method to reset throttling (normally when a verify attempt
succeeded).
Pass 'commit=False' to avoid calling self.save().
"""
self.throttling_failure_timestamp = None
self.throttling_failure_count = 0
if commit:
self.save()
def throttle_increment(self, commit=True):
"""
Call this method to increase throttling (normally when a verify attempt
failed).
Pass 'commit=False' to avoid calling self.save().
"""
self.throttling_failure_timestamp = timezone.now()
self.throttling_failure_count += 1
if commit:
self.save()
@cached_property
def throttling_enabled(self) -> bool:
"""Check if throttling is enabled"""
return self.get_throttle_factor() > 0
def get_throttle_factor(self): # pragma: no cover
"""
This must be implemented to return the throttle factor.
The number of seconds required between verification attempts will be
:math:`c2^{n-1}` where `c` is this factor and `n` is the number of
previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
etc. seconds. A factor of 0 disables the throttling.
Normally this is just a wrapper for a plugin-specific setting like
:setting:`OTP_EMAIL_THROTTLE_FACTOR`.
"""
raise NotImplementedError()
class Meta:
abstract = True

View file

@ -0,0 +1,199 @@
"""OATH helpers"""
import hmac
from hashlib import sha1
from struct import pack
from time import time
# pylint: disable=invalid-name
def hotp(key: bytes, counter: int, digits=6) -> int:
"""
Implementation of the HOTP algorithm from `RFC 4226
<http://tools.ietf.org/html/rfc4226#section-5>`_.
:param bytes key: The shared secret. A 20-byte string is recommended.
:param int counter: The password counter.
:param int digits: The number of decimal digits to generate.
:returns: The HOTP token.
:rtype: int
>>> key = b'12345678901234567890'
>>> for c in range(10):
... hotp(key, c)
755224
287082
359152
969429
338314
254676
287922
162583
399871
520489
"""
msg = pack(b">Q", counter)
hs = hmac.new(key, msg, sha1).digest()
hs = list(iter(hs))
offset = hs[19] & 0x0F
bin_code = (
(hs[offset] & 0x7F) << 24 | hs[offset + 1] << 16 | hs[offset + 2] << 8 | hs[offset + 3]
)
return bin_code % pow(10, digits)
def totp(key: bytes, step=30, t0=0, digits=6, drift=0) -> int:
"""
Implementation of the TOTP algorithm from `RFC 6238
<http://tools.ietf.org/html/rfc6238#section-4>`_.
:param bytes key: The shared secret. A 20-byte string is recommended.
:param int step: The time step in seconds. The time-based code changes
every ``step`` seconds.
:param int t0: The Unix time at which to start counting time steps.
:param int digits: The number of decimal digits to generate.
:param int drift: The number of time steps to add or remove. Delays and
clock differences might mean that you have to look back or forward a
step or two in order to match a token.
:returns: The TOTP token.
:rtype: int
>>> key = b'12345678901234567890'
>>> now = int(time())
>>> for delta in range(0, 200, 20):
... totp(key, t0=(now-delta))
755224
755224
287082
359152
359152
969429
338314
338314
254676
287922
"""
return TOTP(key, step, t0, digits, drift).token()
class TOTP:
"""
An alternate TOTP interface.
This provides access to intermediate steps of the computation. This is a
living object: the return values of ``t`` and ``token`` will change along
with other properties and with the passage of time.
:param bytes key: The shared secret. A 20-byte string is recommended.
:param int step: The time step in seconds. The time-based code changes
every ``step`` seconds.
:param int t0: The Unix time at which to start counting time steps.
:param int digits: The number of decimal digits to generate.
:param int drift: The number of time steps to add or remove. Delays and
clock differences might mean that you have to look back or forward a
step or two in order to match a token.
>>> key = b'12345678901234567890'
>>> totp = TOTP(key)
>>> totp.time = 0
>>> totp.t()
0
>>> totp.token()
755224
>>> totp.time = 30
>>> totp.t()
1
>>> totp.token()
287082
>>> totp.verify(287082)
True
>>> totp.verify(359152)
False
>>> totp.verify(359152, tolerance=1)
True
>>> totp.drift
1
>>> totp.drift = 0
>>> totp.verify(359152, tolerance=1, min_t=3)
False
>>> totp.drift
0
>>> del totp.time
>>> totp.t0 = int(time()) - 60
>>> totp.t()
2
>>> totp.token()
359152
"""
# pylint: disable=too-many-arguments
def __init__(self, key: bytes, step=30, t0=0, digits=6, drift=0):
self.key = key
self.step = step
self.t0 = t0
self.digits = digits
self.drift = drift
self._time = None
def token(self):
"""The computed TOTP token."""
return hotp(self.key, self.t(), digits=self.digits)
def t(self):
"""The computed time step."""
return ((int(self.time) - self.t0) // self.step) + self.drift
@property
def time(self):
"""
The current time.
By default, this returns time.time() each time it is accessed. If you
want to generate a token at a specific time, you can set this property
to a fixed value instead. Deleting the value returns it to its 'live'
state.
"""
return self._time if (self._time is not None) else time()
@time.setter
def time(self, value):
self._time = value
@time.deleter
def time(self):
self._time = None
def verify(self, token, tolerance=0, min_t=None):
"""
A high-level verification helper.
:param int token: The provided token.
:param int tolerance: The amount of clock drift you're willing to
accommodate, in steps. We'll look for the token at t values in
[t - tolerance, t + tolerance].
:param int min_t: The minimum t value we'll accept. As a rule, this
should be one larger than the largest t value of any previously
accepted token.
:rtype: bool
Iff this returns True, `self.drift` will be updated to reflect the
drift value that was necessary to match the token.
"""
drift_orig = self.drift
verified = False
for offset in range(-tolerance, tolerance + 1):
self.drift = drift_orig + offset
if (min_t is not None) and (self.t() < min_t):
continue
if self.token() == token:
verified = True
break
else:
self.drift = drift_orig
return verified

View file

@ -0,0 +1,220 @@
"""Base authenticator tests"""
from datetime import timedelta
from threading import Thread
from django.contrib.auth.models import AnonymousUser
from django.db import connection
from django.test import TestCase, TransactionTestCase
from django.test.utils import override_settings
from django.utils import timezone
from freezegun import freeze_time
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id
from authentik.stages.authenticator import match_token, user_has_device, verify_token
from authentik.stages.authenticator.models import Device, VerifyNotAllowed
class TestThread(Thread):
"Django testing quirk: threads have to close their DB connections."
__test__ = False
def run(self):
super().run()
connection.close()
class ThrottlingTestMixin:
"""
Generic tests for throttled devices.
Any concrete device implementation that uses throttling should define a
TestCase subclass that includes this as a base class. This will help verify
a correct integration of ThrottlingMixin.
Subclasses are responsible for populating self.device with a device to test
as well as implementing methods to generate tokens to test with.
"""
device: Device
def valid_token(self):
"""Returns a valid token to pass to our device under test."""
raise NotImplementedError()
def invalid_token(self):
"""Returns an invalid token to pass to our device under test."""
raise NotImplementedError()
#
# Tests
#
def test_delay_imposed_after_fail(self):
"""Test delay imposed after fail"""
verified1 = self.device.verify_token(self.invalid_token())
self.assertFalse(verified1)
verified2 = self.device.verify_token(self.valid_token())
self.assertFalse(verified2)
def test_delay_after_fail_expires(self):
"""Test delay after fail expires"""
verified1 = self.device.verify_token(self.invalid_token())
self.assertFalse(verified1)
with freeze_time() as frozen_time:
# With default settings initial delay is 1 second
frozen_time.tick(delta=timedelta(seconds=1.1))
verified2 = self.device.verify_token(self.valid_token())
self.assertTrue(verified2)
def test_throttling_failure_count(self):
"""Test throttling failure count"""
self.assertEqual(self.device.throttling_failure_count, 0)
for _ in range(0, 5):
self.device.verify_token(self.invalid_token())
# Only the first attempt will increase throttling_failure_count,
# the others will all be within 1 second of first
# and therefore not count as attempts.
self.assertEqual(self.device.throttling_failure_count, 1)
def test_verify_is_allowed(self):
"""Test verify allowed"""
# Initially should be allowed
verify_is_allowed1, data1 = self.device.verify_is_allowed()
self.assertEqual(verify_is_allowed1, True)
self.assertEqual(data1, None)
# After failure, verify is not allowed
with freeze_time():
self.device.verify_token(self.invalid_token())
verify_is_allowed2, data2 = self.device.verify_is_allowed()
self.assertEqual(verify_is_allowed2, False)
self.assertEqual(
data2,
{
"reason": VerifyNotAllowed.N_FAILED_ATTEMPTS,
"failure_count": 1,
"locked_until": timezone.now() + timezone.timedelta(seconds=1),
},
)
# After a successful attempt, should be allowed again
with freeze_time() as frozen_time:
frozen_time.tick(delta=timedelta(seconds=1.1))
self.device.verify_token(self.valid_token())
verify_is_allowed3, data3 = self.device.verify_is_allowed()
self.assertEqual(verify_is_allowed3, True)
self.assertEqual(data3, None)
@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class APITestCase(TestCase):
"""Test API"""
def setUp(self):
self.alice = create_test_admin_user("alice")
self.bob = create_test_admin_user("bob")
device = self.alice.staticdevice_set.create()
self.valid = generate_id(length=16)
device.token_set.create(token=self.valid)
def test_user_has_device(self):
"""Test user_has_device"""
with self.subTest(user="anonymous"):
self.assertFalse(user_has_device(AnonymousUser()))
with self.subTest(user="alice"):
self.assertTrue(user_has_device(self.alice))
with self.subTest(user="bob"):
self.assertFalse(user_has_device(self.bob))
def test_verify_token(self):
"""Test verify_token"""
device = self.alice.staticdevice_set.first()
verified = verify_token(self.alice, device.persistent_id, "bogus")
self.assertIsNone(verified)
verified = verify_token(self.alice, device.persistent_id, self.valid)
self.assertIsNotNone(verified)
def test_match_token(self):
"""Test match_token"""
verified = match_token(self.alice, "bogus")
self.assertIsNone(verified)
verified = match_token(self.alice, self.valid)
self.assertEqual(verified, self.alice.staticdevice_set.first())
@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class ConcurrencyTestCase(TransactionTestCase):
"""Test concurrent verifications"""
def setUp(self):
self.alice = create_test_admin_user("alice")
self.bob = create_test_admin_user("bob")
self.valid = generate_id(length=16)
for user in [self.alice, self.bob]:
device = user.staticdevice_set.create()
device.token_set.create(token=self.valid)
def test_verify_token(self):
"""Test verify_token in a thread"""
class VerifyThread(Thread):
"""Verifier thread"""
__test__ = False
def __init__(self, user, device_id, token):
super().__init__()
self.user = user
self.device_id = device_id
self.token = token
self.verified = None
def run(self):
self.verified = verify_token(self.user, self.device_id, self.token)
connection.close()
device = self.alice.staticdevice_set.get()
threads = [VerifyThread(device.user, device.persistent_id, self.valid) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertEqual(sum(1 for t in threads if t.verified is not None), 1)
def test_match_token(self):
"""Test match_token in a thread"""
class VerifyThread(Thread):
"""Verifier thread"""
__test__ = False
def __init__(self, user, token):
super().__init__()
self.user = user
self.token = token
self.verified = None
def run(self):
self.verified = match_token(self.user, self.token)
connection.close()
threads = [VerifyThread(self.alice, self.valid) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
self.assertEqual(sum(1 for t in threads if t.verified is not None), 1)

View file

@ -0,0 +1,86 @@
"""Authenticator utils"""
import random
import string
from binascii import unhexlify
from os import urandom
from django.core.exceptions import ValidationError
def hex_validator(length=0):
"""
Returns a function to be used as a model validator for a hex-encoded
CharField. This is useful for secret keys of all kinds::
def key_validator(value):
return hex_validator(20)(value)
key = models.CharField(max_length=40,
validators=[key_validator], help_text='A hex-encoded 20-byte secret key')
:param int length: If greater than 0, validation will fail unless the
decoded value is exactly this number of bytes.
:rtype: function
>>> hex_validator()('0123456789abcdef')
>>> hex_validator(8)(b'0123456789abcdef')
>>> hex_validator()('phlebotinum') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValidationError: ['phlebotinum is not valid hex-encoded data.']
>>> hex_validator(9)('0123456789abcdef') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValidationError: ['0123456789abcdef does not represent exactly 9 bytes.']
"""
def _validator(value):
try:
if isinstance(value, str):
value = value.encode()
unhexlify(value)
except Exception:
raise ValidationError("{0} is not valid hex-encoded data.".format(value))
if (length > 0) and (len(value) != length * 2):
raise ValidationError("{0} does not represent exactly {1} bytes.".format(value, length))
return _validator
def random_hex(length=20):
"""
Returns a string of random bytes encoded as hex.
This uses :func:`os.urandom`, so it should be suitable for generating
cryptographic keys.
:param int length: The number of (decoded) bytes to return.
:returns: A string of hex digits.
:rtype: str
"""
return urandom(length).hex()
def random_number_token(length=6):
"""
Returns a string of random digits encoded as string.
:param int length: The number of digits to return.
:returns: A string of decimal digits.
:rtype: str
"""
rand = random.SystemRandom()
if hasattr(rand, "choices"):
digits = rand.choices(string.digits, k=length)
else:
digits = (rand.choice(string.digits) for i in range(length))
return "".join(digits)

View file

@ -5,7 +5,6 @@ from django.contrib.auth import get_user_model
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.views import View
from django_otp.models import Device
from duo_client.admin import Admin
from duo_client.auth import Auth
from rest_framework.serializers import BaseSerializer, Serializer
@ -14,6 +13,7 @@ from authentik.core.types import UserSettingSerializer
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.lib.utils.http import authentik_user_agent
from authentik.stages.authenticator.models import Device
class AuthenticatorDuoStage(ConfigurableStage, FriendlyNamedStage, Stage):

View file

@ -6,7 +6,6 @@ from django.contrib.auth import get_user_model
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.views import View
from django_otp.models import SideChannelDevice
from requests.exceptions import RequestException
from rest_framework.exceptions import ValidationError
from rest_framework.serializers import BaseSerializer
@ -21,6 +20,7 @@ from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.lib.utils.errors import exception_to_string
from authentik.lib.utils.http import get_http_session
from authentik.stages.authenticator.models import SideChannelDevice
LOGGER = get_logger()

View file

@ -1,6 +1,5 @@
"""AuthenticatorStaticStage API Views"""
from django_filters.rest_framework import DjangoFilterBackend
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAdminUser
@ -10,7 +9,11 @@ from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.flows.api.stages import StageSerializer
from authentik.stages.authenticator_static.models import AuthenticatorStaticStage
from authentik.stages.authenticator_static.models import (
AuthenticatorStaticStage,
StaticDevice,
StaticToken,
)
class AuthenticatorStaticStageSerializer(StageSerializer):

View file

@ -0,0 +1,70 @@
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_static",
"0007_authenticatorstaticstage_token_length_and_more",
),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="StaticDevice",
fields=[
(
"id",
models.AutoField(
verbose_name="ID", serialize=False, auto_created=True, primary_key=True
),
),
(
"name",
models.CharField(
help_text="The human-readable name of this device.", max_length=64
),
),
(
"confirmed",
models.BooleanField(default=True, help_text="Is this device ready for use?"),
),
(
"user",
models.ForeignKey(
help_text="The user that this device belongs to.",
to=settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
),
),
],
options={
"abstract": False,
},
bases=(models.Model,),
),
migrations.CreateModel(
name="StaticToken",
fields=[
(
"id",
models.AutoField(
verbose_name="ID", serialize=False, auto_created=True, primary_key=True
),
),
("token", models.CharField(max_length=16, db_index=True)),
(
"device",
models.ForeignKey(
related_name="token_set",
to="authentik_stages_authenticator_static.staticdevice",
on_delete=models.CASCADE,
),
),
],
options={},
bases=(models.Model,),
),
]

View file

@ -0,0 +1,33 @@
# Generated by Django 3.0.5 on 2020-04-16 13:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_stages_authenticator_static", "0008_initial"),
]
operations = [
migrations.AddField(
model_name="staticdevice",
name="throttling_failure_count",
field=models.PositiveIntegerField(
default=0, help_text="Number of successive failed attempts."
),
),
migrations.AddField(
model_name="staticdevice",
name="throttling_failure_timestamp",
field=models.DateTimeField(
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
null=True,
),
),
migrations.AlterModelOptions(
name="staticdevice",
options={"verbose_name": "Static device", "verbose_name_plural": "Static devices"},
),
]

View file

@ -1,6 +1,9 @@
"""Static Authenticator models"""
from base64 import b32encode
from os import urandom
from typing import Optional
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.views import View
@ -8,6 +11,8 @@ from rest_framework.serializers import BaseSerializer
from authentik.core.types import UserSettingSerializer
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.stages.authenticator.models import Device, ThrottlingMixin
class AuthenticatorStaticStage(ConfigurableStage, FriendlyNamedStage, Stage):
@ -46,3 +51,76 @@ class AuthenticatorStaticStage(ConfigurableStage, FriendlyNamedStage, Stage):
class Meta:
verbose_name = _("Static Authenticator Stage")
verbose_name_plural = _("Static Authenticator Stages")
class StaticDevice(SerializerModel, ThrottlingMixin, Device):
"""
A static :class:`~authentik.stages.authenticator.models.Device` simply consists of random
tokens shared by the database and the user.
These are frequently used as emergency tokens in case a user's normal
device is lost or unavailable. They can be consumed in any order; each
token will be removed from the database as soon as it is used.
This model has no fields of its own, but serves as a container for
:class:`StaticToken` objects.
.. attribute:: token_set
The RelatedManager for our tokens.
"""
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_static.api import StaticDeviceSerializer
return StaticDeviceSerializer
def get_throttle_factor(self):
return getattr(settings, "OTP_STATIC_THROTTLE_FACTOR", 1)
def verify_token(self, token):
verify_allowed, _ = self.verify_is_allowed()
if verify_allowed:
match = self.token_set.filter(token=token).first()
if match is not None:
match.delete()
self.throttle_reset()
else:
self.throttle_increment()
else:
match = None
return match is not None
class Meta(Device.Meta):
verbose_name = _("Static device")
verbose_name_plural = _("Static devices")
class StaticToken(models.Model):
"""
A single token belonging to a :class:`StaticDevice`.
.. attribute:: device
*ForeignKey*: A foreign key to :class:`StaticDevice`.
.. attribute:: token
*CharField*: A random string up to 16 characters.
"""
device = models.ForeignKey(StaticDevice, related_name="token_set", on_delete=models.CASCADE)
token = models.CharField(max_length=16, db_index=True)
@staticmethod
def random_token():
"""
Returns a new random string that can be used as a static token.
:rtype: bytes
"""
return b32encode(urandom(5)).decode("utf-8").lower()

View file

@ -1,5 +0,0 @@
"""Static Authenticator settings"""
INSTALLED_APPS = [
"django_otp.plugins.otp_static",
]

View file

@ -1,9 +1,9 @@
"""totp authenticator signals"""
from django.db.models.signals import pre_delete
from django.dispatch import receiver
from django_otp.plugins.otp_static.models import StaticDevice
from authentik.events.models import Event
from authentik.stages.authenticator_static.models import StaticDevice
@receiver(pre_delete, sender=StaticDevice)

View file

@ -1,12 +1,15 @@
"""Static OTP Setup stage"""
from django.http import HttpRequest, HttpResponse
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from rest_framework.fields import CharField, ListField
from authentik.flows.challenge import ChallengeResponse, ChallengeTypes, WithUserInfoChallenge
from authentik.flows.stage import ChallengeStageView
from authentik.lib.generators import generate_id
from authentik.stages.authenticator_static.models import AuthenticatorStaticStage
from authentik.stages.authenticator_static.models import (
AuthenticatorStaticStage,
StaticDevice,
StaticToken,
)
SESSION_STATIC_DEVICE = "static_device"
SESSION_STATIC_TOKENS = "static_device_tokens"

View file

@ -1,9 +1,13 @@
"""Test Static API"""
from django.test.utils import override_settings
from django.urls import reverse
from django_otp.plugins.otp_static.models import StaticDevice
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.core.tests.utils import create_test_admin_user
from authentik.lib.generators import generate_id
from authentik.stages.authenticator.tests import TestCase, ThrottlingTestMixin
from authentik.stages.authenticator_static.models import StaticDevice
class AuthenticatorStaticStageTests(APITestCase):
@ -18,3 +22,42 @@ class AuthenticatorStaticStageTests(APITestCase):
reverse("authentik_api:staticdevice-detail", kwargs={"pk": dev.pk})
)
self.assertEqual(response.status_code, 204)
class DeviceTest(TestCase):
"""A few generic tests to get us started."""
def setUp(self):
self.user = create_test_admin_user("alice")
def test_str(self):
"""Test __str__ of model"""
device = StaticDevice.objects.create(user=self.user, name="Device")
str(device)
def test_str_unpopulated(self):
"""Test __str__ of model"""
device = StaticDevice()
str(device)
@override_settings(
OTP_STATIC_THROTTLE_FACTOR=1,
)
class ThrottlingTestCase(ThrottlingTestMixin, TestCase):
"""Test static device throttling"""
def setUp(self):
user = create_test_admin_user("alice")
self.device = user.staticdevice_set.create()
self.device.token_set.create(token=generate_id(length=16))
self.device.token_set.create(token=generate_id(length=16))
self.device.token_set.create(token=generate_id(length=16))
def valid_token(self):
return self.device.token_set.first().token
def invalid_token(self):
return "bogus"

View file

@ -1,6 +1,5 @@
"""AuthenticatorTOTPStage API Views"""
from django_filters.rest_framework.backends import DjangoFilterBackend
from django_otp.plugins.otp_totp.models import TOTPDevice
from rest_framework import mixins
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAdminUser
@ -10,7 +9,7 @@ from rest_framework.viewsets import GenericViewSet, ModelViewSet
from authentik.api.authorization import OwnerFilter, OwnerPermissions
from authentik.core.api.used_by import UsedByMixin
from authentik.flows.api.stages import StageSerializer
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage, TOTPDevice
class AuthenticatorTOTPStageSerializer(StageSerializer):

View file

@ -0,0 +1,98 @@
from django.conf import settings
from django.db import migrations, models
import authentik.stages.authenticator_totp.models
class Migration(migrations.Migration):
dependencies = [
("authentik_stages_authenticator_totp", "0007_authenticatortotpstage_friendly_name"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="TOTPDevice",
fields=[
(
"id",
models.AutoField(
verbose_name="ID", serialize=False, auto_created=True, primary_key=True
),
),
(
"name",
models.CharField(
help_text="The human-readable name of this device.", max_length=64
),
),
(
"confirmed",
models.BooleanField(default=True, help_text="Is this device ready for use?"),
),
(
"key",
models.CharField(
default=authentik.stages.authenticator_totp.models.default_key,
help_text="A hex-encoded secret key of up to 40 bytes.",
max_length=80,
validators=[authentik.stages.authenticator_totp.models.key_validator],
),
),
(
"step",
models.PositiveSmallIntegerField(
default=30, help_text="The time step in seconds."
),
),
(
"t0",
models.BigIntegerField(
default=0, help_text="The Unix time at which to begin counting steps."
),
),
(
"digits",
models.PositiveSmallIntegerField(
default=6,
help_text="The number of digits to expect in a token.",
choices=[(6, 6), (8, 8)],
),
),
(
"tolerance",
models.PositiveSmallIntegerField(
default=1,
help_text="The number of time steps in the past or future to allow.",
),
),
(
"drift",
models.SmallIntegerField(
default=0,
help_text="The number of time steps the prover is known to deviate from our clock.",
),
),
(
"last_t",
models.BigIntegerField(
default=-1,
help_text="The t value of the latest verified token. The next token must be at a higher time step.",
),
),
(
"user",
models.ForeignKey(
help_text="The user that this device belongs to.",
to=settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
),
),
],
options={
"abstract": False,
"verbose_name": "TOTP device",
},
bases=(models.Model,),
),
]

View file

@ -0,0 +1,29 @@
# Generated by Django 2.2 on 2019-04-20 12:23
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_stages_authenticator_totp", "0008_initial"),
]
operations = [
migrations.AddField(
model_name="totpdevice",
name="throttling_failure_count",
field=models.PositiveIntegerField(
default=0, help_text="Number of successive failed attempts."
),
),
migrations.AddField(
model_name="totpdevice",
name="throttling_failure_timestamp",
field=models.DateTimeField(
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
null=True,
),
),
]

View file

@ -0,0 +1,28 @@
# Generated by Django 4.2.4 on 2023-09-03 00:55
from django.db import migrations, models
import authentik.stages.authenticator_totp.models
class Migration(migrations.Migration):
dependencies = [
("authentik_stages_authenticator_totp", "0009_auto_20190420_0723"),
]
operations = [
migrations.AlterField(
model_name="totpdevice",
name="key",
field=models.CharField(
default=authentik.stages.authenticator_totp.models.default_key,
help_text="A hex-encoded secret key of up to 40 bytes.",
max_length=80,
validators=[authentik.stages.authenticator_totp.models.key_validator],
),
),
migrations.AlterModelOptions(
name="totpdevice",
options={"verbose_name": "TOTP device", "verbose_name_plural": "TOTP devices"},
),
]

View file

@ -1,6 +1,11 @@
"""OTP Time-based models"""
import time
from base64 import b32encode
from binascii import unhexlify
from typing import Optional
from urllib.parse import quote, urlencode
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _
from django.views import View
@ -8,10 +13,14 @@ from rest_framework.serializers import BaseSerializer
from authentik.core.types import UserSettingSerializer
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.stages.authenticator.models import Device, ThrottlingMixin
from authentik.stages.authenticator.oath import TOTP
from authentik.stages.authenticator.util import hex_validator, random_hex
class TOTPDigits(models.IntegerChoices):
"""OTP Time Digits"""
"""OTP Time Digits"""
SIX = 6, _("6 digits, widely compatible")
EIGHT = 8, _("8 digits, not compatible with apps like Google Authenticator")
@ -52,3 +61,185 @@ class AuthenticatorTOTPStage(ConfigurableStage, FriendlyNamedStage, Stage):
class Meta:
verbose_name = _("TOTP Authenticator Setup Stage")
verbose_name_plural = _("TOTP Authenticator Setup Stages")
def default_key():
"""Default TOTP Device key"""
return random_hex(20)
def key_validator(value):
"""Validate totp key"""
return hex_validator()(value)
class TOTPDevice(SerializerModel, ThrottlingMixin, Device):
"""
A generic TOTP :class:`~authentik.stages.authenticator.models.Device`. The model fields mostly
correspond to the arguments to :func:`authentik.stages.authenticator.oath.totp`. They all have
sensible defaults, including the key, which is randomly generated.
.. attribute:: key
*CharField*: A hex-encoded secret key of up to 40 bytes. (Default: 20
random bytes)
.. attribute:: step
*PositiveSmallIntegerField*: The time step in seconds. (Default: 30)
.. attribute:: t0
*BigIntegerField*: The Unix time at which to begin counting steps.
(Default: 0)
.. attribute:: digits
*PositiveSmallIntegerField*: The number of digits to expect in a token
(6 or 8). (Default: 6)
.. attribute:: tolerance
*PositiveSmallIntegerField*: The number of time steps in the past or
future to allow. For example, if this is 1, we'll accept any of three
tokens: the current one, the previous one, and the next one. (Default:
1)
.. attribute:: drift
*SmallIntegerField*: The number of time steps the prover is known to
deviate from our clock. If :setting:`OTP_TOTP_SYNC` is ``True``, we'll
update this any time we match a token that is not the current one.
(Default: 0)
.. attribute:: last_t
*BigIntegerField*: The time step of the last verified token. To avoid
verifying the same token twice, this will be updated on each successful
verification. Only tokens at a higher time step will be verified
subsequently. (Default: -1)
"""
key = models.CharField(
max_length=80,
validators=[key_validator],
default=default_key,
help_text="A hex-encoded secret key of up to 40 bytes.",
)
step = models.PositiveSmallIntegerField(default=30, help_text="The time step in seconds.")
t0 = models.BigIntegerField(
default=0, help_text="The Unix time at which to begin counting steps."
)
digits = models.PositiveSmallIntegerField(
choices=[(6, 6), (8, 8)],
default=6,
help_text="The number of digits to expect in a token.",
)
tolerance = models.PositiveSmallIntegerField(
default=1, help_text="The number of time steps in the past or future to allow."
)
drift = models.SmallIntegerField(
default=0,
help_text="The number of time steps the prover is known to deviate from our clock.",
)
last_t = models.BigIntegerField(
default=-1,
help_text=(
"The t value of the latest verified token. "
"The next token must be at a higher time step."
),
)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_totp.api import TOTPDeviceSerializer
return TOTPDeviceSerializer
@property
def bin_key(self):
"""
The secret key as a binary string.
"""
return unhexlify(self.key.encode())
def verify_token(self, token):
otp_totp_sync = getattr(settings, "OTP_TOTP_SYNC", True)
verify_allowed, _ = self.verify_is_allowed()
if not verify_allowed:
return False
try:
token = int(token)
except ValueError:
verified = False
else:
key = self.bin_key
totp = TOTP(key, self.step, self.t0, self.digits, self.drift)
totp.time = time.time()
verified = totp.verify(token, self.tolerance, self.last_t + 1)
if verified:
self.last_t = totp.t()
if otp_totp_sync:
self.drift = totp.drift
self.throttle_reset(commit=False)
self.save()
if not verified:
self.throttle_increment(commit=True)
return verified
def get_throttle_factor(self):
return getattr(settings, "OTP_TOTP_THROTTLE_FACTOR", 1)
@property
def config_url(self):
"""
A URL for configuring Google Authenticator or similar.
See https://github.com/google/google-authenticator/wiki/Key-Uri-Format.
The issuer is taken from :setting:`OTP_TOTP_ISSUER`, if available.
The image (for e.g. FreeOTP) is taken from :setting:`OTP_TOTP_IMAGE`, if available.
"""
label = str(self.user.username)
params = {
"secret": b32encode(self.bin_key),
"algorithm": "SHA1",
"digits": self.digits,
"period": self.step,
}
urlencoded_params = urlencode(params)
issuer = self._read_str_from_settings("OTP_TOTP_ISSUER")
if issuer:
issuer = issuer.replace(":", "")
label = "{}:{}".format(issuer, label)
urlencoded_params += "&issuer={}".format(
quote(issuer)
) # encode issuer as per RFC 3986, not quote_plus
image = self._read_str_from_settings("OTP_TOTP_IMAGE")
if image:
urlencoded_params += "&image={}".format(quote(image, safe=":/"))
url = "otpauth://totp/{}?{}".format(quote(label), urlencoded_params)
return url
def _read_str_from_settings(self, key):
val = getattr(settings, key, None)
if callable(val):
val = val(self)
if isinstance(val, str) and (val != ""):
return val
return None
class Meta(Device.Meta):
verbose_name = _("TOTP device")
verbose_name_plural = _("TOTP devices")

View file

@ -1,6 +1,3 @@
"""OTP Time"""
INSTALLED_APPS = [
"django_otp.plugins.otp_totp",
]
OTP_TOTP_ISSUER = "__to_replace__"

View file

@ -4,7 +4,6 @@ from urllib.parse import quote
from django.http import HttpRequest, HttpResponse
from django.http.request import QueryDict
from django.utils.translation import gettext_lazy as _
from django_otp.plugins.otp_totp.models import TOTPDevice
from rest_framework.fields import CharField, IntegerField
from rest_framework.serializers import ValidationError
@ -15,7 +14,7 @@ from authentik.flows.challenge import (
WithUserInfoChallenge,
)
from authentik.flows.stage import ChallengeStageView
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage, TOTPDevice
from authentik.stages.authenticator_totp.settings import OTP_TOTP_ISSUER
SESSION_TOTP_DEVICE = "totp_device"

View file

@ -1,9 +1,15 @@
"""Test TOTP API"""
from time import time
from urllib.parse import parse_qs, urlsplit
from django.test.utils import override_settings
from django.urls import reverse
from django_otp.plugins.otp_totp.models import TOTPDevice
from rest_framework.test import APITestCase
from authentik.core.models import User
from authentik.core.tests.utils import create_test_admin_user
from authentik.stages.authenticator.tests import TestCase, ThrottlingTestMixin
from authentik.stages.authenticator_totp.models import TOTPDevice
class AuthenticatorTOTPStage(APITestCase):
@ -18,3 +24,180 @@ class AuthenticatorTOTPStage(APITestCase):
reverse("authentik_api:totpdevice-detail", kwargs={"pk": dev.pk})
)
self.assertEqual(response.status_code, 204)
class TOTPDeviceMixin:
"""
A TestCase helper that gives us a TOTPDevice to work with.
"""
# The next ten tokens
tokens = [
179225,
656163,
839400,
154567,
346912,
471576,
45675,
101397,
491039,
784503,
]
# pylint: disable=invalid-name
def setUp(self):
"""
Create a device at the fourth time step. The current token is 154567.
"""
self.alice = create_test_admin_user("alice", email="alice@example.com")
self.device = self.alice.totpdevice_set.create(
key="2a2bbba1092ffdd25a328ad1a0a5f5d61d7aacc4",
step=30,
t0=int(time() - (30 * 3)),
digits=6,
tolerance=0,
drift=0,
)
@override_settings(
OTP_TOTP_SYNC=False,
OTP_TOTP_THROTTLE_FACTOR=0,
)
class TOTPTest(TOTPDeviceMixin, TestCase):
"""TOTP tests"""
def test_default_key(self):
"""Ensure default_key is valid"""
device = self.alice.totpdevice_set.create()
# Make sure we can decode the key.
_ = device.bin_key
def test_single(self):
"""Test single token"""
results = [self.device.verify_token(token) for token in self.tokens]
self.assertEqual(results, [False] * 3 + [True] + [False] * 6)
def test_tolerance(self):
"""Test tolerance"""
self.device.tolerance = 1
results = [self.device.verify_token(token) for token in self.tokens]
self.assertEqual(results, [False] * 2 + [True] * 3 + [False] * 5)
def test_drift(self):
"""Test drift"""
self.device.tolerance = 1
self.device.drift = -1
results = [self.device.verify_token(token) for token in self.tokens]
self.assertEqual(results, [False] * 1 + [True] * 3 + [False] * 6)
def test_sync_drift(self):
"""Test sync drift"""
self.device.tolerance = 2
with self.settings(OTP_TOTP_SYNC=True):
valid = self.device.verify_token(self.tokens[5])
self.assertTrue(valid)
self.assertEqual(self.device.drift, 2)
def test_no_reuse(self):
"""Test reuse"""
verified1 = self.device.verify_token(self.tokens[3])
verified2 = self.device.verify_token(self.tokens[3])
self.assertEqual(self.device.last_t, 3)
self.assertTrue(verified1)
self.assertFalse(verified2)
def test_config_url(self):
"""Test config_url"""
with override_settings(OTP_TOTP_ISSUER=None):
url = self.device.config_url
parsed = urlsplit(url)
params = parse_qs(parsed.query)
self.assertEqual(parsed.scheme, "otpauth")
self.assertEqual(parsed.netloc, "totp")
self.assertEqual(parsed.path, "/alice")
self.assertIn("secret", params)
self.assertNotIn("issuer", params)
def test_config_url_issuer(self):
"""Test config_url issuer"""
with override_settings(OTP_TOTP_ISSUER="example.com"):
url = self.device.config_url
parsed = urlsplit(url)
params = parse_qs(parsed.query)
self.assertEqual(parsed.scheme, "otpauth")
self.assertEqual(parsed.netloc, "totp")
self.assertEqual(parsed.path, "/example.com%3Aalice")
self.assertIn("secret", params)
self.assertIn("issuer", params)
self.assertEqual(params["issuer"][0], "example.com")
def test_config_url_issuer_spaces(self):
"""Test config_url issuer with spaces"""
with override_settings(OTP_TOTP_ISSUER="Very Trustworthy Source"):
url = self.device.config_url
parsed = urlsplit(url)
params = parse_qs(parsed.query)
self.assertEqual(parsed.scheme, "otpauth")
self.assertEqual(parsed.netloc, "totp")
self.assertEqual(parsed.path, "/Very%20Trustworthy%20Source%3Aalice")
self.assertIn("secret", params)
self.assertIn("issuer", params)
self.assertEqual(params["issuer"][0], "Very Trustworthy Source")
def test_config_url_issuer_method(self):
"""Test config_url issuer method"""
with override_settings(OTP_TOTP_ISSUER=lambda d: d.user.email):
url = self.device.config_url
parsed = urlsplit(url)
params = parse_qs(parsed.query)
self.assertEqual(parsed.scheme, "otpauth")
self.assertEqual(parsed.netloc, "totp")
self.assertEqual(parsed.path, "/alice%40example.com%3Aalice")
self.assertIn("secret", params)
self.assertIn("issuer", params)
self.assertEqual(params["issuer"][0], "alice@example.com")
def test_config_url_image(self):
"""Test config_url with image"""
image_url = "https://test.invalid/square.png"
with override_settings(OTP_TOTP_ISSUER=None, OTP_TOTP_IMAGE=image_url):
url = self.device.config_url
parsed = urlsplit(url)
params = parse_qs(parsed.query)
self.assertEqual(parsed.scheme, "otpauth")
self.assertEqual(parsed.netloc, "totp")
self.assertEqual(parsed.path, "/alice")
self.assertIn("secret", params)
self.assertEqual(params["image"][0], image_url)
@override_settings(
OTP_TOTP_THROTTLE_FACTOR=1,
)
class ThrottlingTestCase(TOTPDeviceMixin, ThrottlingTestMixin, TestCase):
"""Test TOTP Throttling"""
def valid_token(self):
return self.tokens[3]
def invalid_token(self):
return -1

View file

@ -7,8 +7,6 @@ from django.http.response import Http404
from django.shortcuts import get_object_or_404
from django.utils.translation import gettext as __
from django.utils.translation import gettext_lazy as _
from django_otp import match_token
from django_otp.models import Device
from rest_framework.fields import CharField, JSONField
from rest_framework.serializers import ValidationError
from structlog.stdlib import get_logger
@ -25,6 +23,8 @@ from authentik.events.models import Event, EventAction
from authentik.flows.stage import StageView
from authentik.flows.views.executor import SESSION_KEY_APPLICATION_PRE
from authentik.lib.utils.http import get_client_ip
from authentik.stages.authenticator import match_token
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
from authentik.stages.authenticator_sms.models import SMSDevice
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses

View file

@ -1,4 +0,0 @@
"""OTP Validate stage settings"""
INSTALLED_APPS = [
"django_otp",
]

View file

@ -5,8 +5,6 @@ from typing import Optional
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django_otp import devices_for_user
from django_otp.models import Device
from jwt import PyJWTError, decode, encode
from rest_framework.fields import CharField, IntegerField, JSONField, ListField, UUIDField
from rest_framework.serializers import ValidationError
@ -21,6 +19,8 @@ from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
from authentik.flows.stage import ChallengeStageView
from authentik.lib.utils.time import timedelta_from_string
from authentik.root.install_id import get_install_id
from authentik.stages.authenticator import devices_for_user
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator_sms.models import SMSDevice
from authentik.stages.authenticator_validate.challenge import (
DeviceChallenge,

View file

@ -5,8 +5,6 @@ from time import sleep
from django.test.client import RequestFactory
from django.urls.base import reverse
from django_otp.oath import TOTP
from django_otp.plugins.otp_totp.models import TOTPDevice
from jwt import encode
from rest_framework.exceptions import ValidationError
@ -17,6 +15,8 @@ from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import FlowExecutorView
from authentik.lib.generators import generate_id
from authentik.root.install_id import get_install_id
from authentik.stages.authenticator.oath import TOTP
from authentik.stages.authenticator_totp.models import TOTPDevice
from authentik.stages.authenticator_validate.challenge import (
get_challenge_for_device,
validate_challenge_code,

View file

@ -6,7 +6,6 @@ from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django.views import View
from django_otp.models import Device
from rest_framework.serializers import BaseSerializer, Serializer
from webauthn.helpers.base64url_to_bytes import base64url_to_bytes
from webauthn.helpers.structs import PublicKeyCredentialDescriptor
@ -14,6 +13,7 @@ from webauthn.helpers.structs import PublicKeyCredentialDescriptor
from authentik.core.types import UserSettingSerializer
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.stages.authenticator.models import Device
class UserVerification(models.TextChoices):

View file

@ -1625,6 +1625,42 @@
}
}
},
{
"type": "object",
"required": [
"model",
"identifiers"
],
"properties": {
"model": {
"const": "authentik_stages_authenticator_static.staticdevice"
},
"id": {
"type": "string"
},
"state": {
"type": "string",
"enum": [
"absent",
"present",
"created"
],
"default": "present"
},
"conditions": {
"type": "array",
"items": {
"type": "boolean"
}
},
"attrs": {
"$ref": "#/$defs/model_authentik_stages_authenticator_static.staticdevice"
},
"identifiers": {
"$ref": "#/$defs/model_authentik_stages_authenticator_static.staticdevice"
}
}
},
{
"type": "object",
"required": [
@ -1661,6 +1697,42 @@
}
}
},
{
"type": "object",
"required": [
"model",
"identifiers"
],
"properties": {
"model": {
"const": "authentik_stages_authenticator_totp.totpdevice"
},
"id": {
"type": "string"
},
"state": {
"type": "string",
"enum": [
"absent",
"present",
"created"
],
"default": "present"
},
"conditions": {
"type": "array",
"items": {
"type": "boolean"
}
},
"attrs": {
"$ref": "#/$defs/model_authentik_stages_authenticator_totp.totpdevice"
},
"identifiers": {
"$ref": "#/$defs/model_authentik_stages_authenticator_totp.totpdevice"
}
}
},
{
"type": "object",
"required": [
@ -3232,6 +3304,7 @@
"authentik.sources.oauth",
"authentik.sources.plex",
"authentik.sources.saml",
"authentik.stages.authenticator",
"authentik.stages.authenticator_duo",
"authentik.stages.authenticator_sms",
"authentik.stages.authenticator_static",
@ -3310,7 +3383,9 @@
"authentik_stages_authenticator_sms.authenticatorsmsstage",
"authentik_stages_authenticator_sms.smsdevice",
"authentik_stages_authenticator_static.authenticatorstaticstage",
"authentik_stages_authenticator_static.staticdevice",
"authentik_stages_authenticator_totp.authenticatortotpstage",
"authentik_stages_authenticator_totp.totpdevice",
"authentik_stages_authenticator_validate.authenticatorvalidatestage",
"authentik_stages_authenticator_webauthn.authenticatewebauthnstage",
"authentik_stages_authenticator_webauthn.webauthndevice",
@ -5872,6 +5947,19 @@
},
"required": []
},
"model_authentik_stages_authenticator_static.staticdevice": {
"type": "object",
"properties": {
"name": {
"type": "string",
"maxLength": 64,
"minLength": 1,
"title": "Name",
"description": "The human-readable name of this device."
}
},
"required": []
},
"model_authentik_stages_authenticator_totp.authenticatortotpstage": {
"type": "object",
"properties": {
@ -5986,6 +6074,19 @@
},
"required": []
},
"model_authentik_stages_authenticator_totp.totpdevice": {
"type": "object",
"properties": {
"name": {
"type": "string",
"maxLength": 64,
"minLength": 1,
"title": "Name",
"description": "The human-readable name of this device."
}
},
"required": []
},
"model_authentik_stages_authenticator_validate.authenticatorvalidatestage": {
"type": "object",
"properties": {

View file

@ -0,0 +1,47 @@
# flake8: noqa
from os import system
from lifecycle.migrate import BaseMigration
SQL_STATEMENT = """
BEGIN TRANSACTION;
DELETE FROM django_migrations WHERE app = 'otp_static';
DELETE FROM django_migrations WHERE app = 'otp_totp';
-- Rename tables (static)
ALTER TABLE otp_static_staticdevice RENAME TO authentik_stages_authenticator_static_staticdevice;
ALTER TABLE otp_static_statictoken RENAME TO authentik_stages_authenticator_static_statictoken;
ALTER SEQUENCE otp_static_statictoken_id_seq RENAME TO authentik_stages_authenticator_static_statictoken_id_seq;
ALTER SEQUENCE otp_static_staticdevice_id_seq RENAME TO authentik_stages_authenticator_static_staticdevice_id_seq;
-- Rename tables (totp)
ALTER TABLE otp_totp_totpdevice RENAME TO authentik_stages_authenticator_totp_totpdevice;
ALTER SEQUENCE otp_totp_totpdevice_id_seq RENAME TO authentik_stages_authenticator_totp_totpdevice_id_seq;
COMMIT;"""
class Migration(BaseMigration):
def needs_migration(self) -> bool:
self.cur.execute(
"select * from information_schema.tables WHERE table_name='otp_static_staticdevice'"
)
return bool(self.cur.rowcount)
def system_crit(self, command):
retval = system(command) # nosec
if retval != 0:
raise Exception("Migration error")
def run(self):
self.cur.execute(SQL_STATEMENT)
self.con.commit()
self.system_crit(
"./manage.py migrate authentik_stages_authenticator_static 0008_initial --fake"
)
self.system_crit(
"./manage.py migrate authentik_stages_authenticator_static 0009_throttling --fake"
)
self.system_crit(
"./manage.py migrate authentik_stages_authenticator_totp 0008_initial --fake"
)
self.system_crit(
"./manage.py migrate authentik_stages_authenticator_totp 0009_auto_20190420_0723 --fake"
)

33
poetry.lock generated
View file

@ -1185,23 +1185,6 @@ files = [
[package.dependencies]
Django = ">=3.2"
[[package]]
name = "django-otp"
version = "1.2.2"
description = "A pluggable framework for adding two-factor authentication to Django using one-time passwords."
optional = false
python-versions = ">=3.7"
files = [
{file = "django_otp-1.2.2-py3-none-any.whl", hash = "sha256:90765d5dac238a719f9550ac05681dd6307f513a81a10b6adb879b4abc6bc1a3"},
{file = "django_otp-1.2.2.tar.gz", hash = "sha256:007a6354dabb3a1a54574bf73abf045ebbde0bb8734a38e2ed7845ba450f345e"},
]
[package.dependencies]
django = ">=3.2"
[package.extras]
qrcode = ["qrcode"]
[[package]]
name = "django-prometheus"
version = "2.3.1"
@ -1458,6 +1441,20 @@ prometheus-client = ">=0.8.0"
pytz = "*"
tornado = ">=5.0.0,<7.0.0"
[[package]]
name = "freezegun"
version = "1.2.2"
description = "Let your Python tests travel through time"
optional = false
python-versions = ">=3.6"
files = [
{file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"},
{file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"},
]
[package.dependencies]
python-dateutil = ">=2.7"
[[package]]
name = "frozenlist"
version = "1.4.0"
@ -4456,4 +4453,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "8604e4dac9b0dcc55daccab83d4c182981d21201ef9901cf9c1acdc24288f979"
content-hash = "e6b1df989cb5c50609540c1229d05d8458ef1cc343fb5868402db8b7679ad73c"

View file

@ -95,7 +95,7 @@ extension-pkg-whitelist = ["lxml", "xmlsec"]
# Allow constants to be shorter than normal (and lowercase, for settings.py)
const-rgx = "[a-zA-Z0-9_]{1,40}$"
ignored-modules = ["django-otp", "binascii", "socket", "zlib"]
ignored-modules = ["binascii", "socket", "zlib"]
generated-members = ["xmlsec.constants.*", "xmlsec.tree.*", "xmlsec.template.*"]
ignore = "migrations"
max-attributes = 12
@ -105,7 +105,7 @@ max-branches = 20
DJANGO_SETTINGS_MODULE = "authentik.root.settings"
python_files = ["tests.py", "test_*.py", "*_tests.py"]
junit_family = "xunit2"
addopts = "-p no:celery --junitxml=unittest.xml"
addopts = "-p no:celery --junitxml=unittest.xml -vv --full-trace --doctest-modules"
filterwarnings = [
"ignore:defusedxml.lxml is no longer supported and will be removed in a future release.:DeprecationWarning",
"ignore:SelectableGroups dict interface is deprecated. Use select.:DeprecationWarning",
@ -131,7 +131,6 @@ django = "*"
django-filter = "*"
django-guardian = "*"
django-model-utils = "*"
django-otp = "*"
django-prometheus = "*"
django-redis = "*"
djangorestframework = "*"
@ -183,6 +182,7 @@ coverage = { extras = ["toml"], version = "*" }
debugpy = "*"
django-silk = "*"
drf-jsonschema-serializer = "*"
freezegun = "*"
importlib-metadata = "*"
pdoc = "*"
pylint = "*"

View file

@ -885,7 +885,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -918,7 +918,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -957,7 +957,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -995,7 +995,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -2030,7 +2030,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -2063,7 +2063,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -2102,7 +2102,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -2140,7 +2140,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -2170,7 +2170,7 @@ paths:
name: id
schema:
type: integer
description: A unique integer value identifying this static device.
description: A unique integer value identifying this Static device.
required: true
tags:
- authenticators
@ -26693,6 +26693,7 @@ components:
- authentik.sources.oauth
- authentik.sources.plex
- authentik.sources.saml
- authentik.stages.authenticator
- authentik.stages.authenticator_duo
- authentik.stages.authenticator_sms
- authentik.stages.authenticator_static
@ -26742,6 +26743,7 @@ components:
* `authentik.sources.oauth` - authentik Sources.OAuth
* `authentik.sources.plex` - authentik Sources.Plex
* `authentik.sources.saml` - authentik Sources.SAML
* `authentik.stages.authenticator` - authentik Stages.Authenticator
* `authentik.stages.authenticator_duo` - authentik Stages.Authenticator.Duo
* `authentik.stages.authenticator_sms` - authentik Stages.Authenticator.SMS
* `authentik.stages.authenticator_static` - authentik Stages.Authenticator.Static
@ -29476,6 +29478,7 @@ components:
* `authentik.sources.oauth` - authentik Sources.OAuth
* `authentik.sources.plex` - authentik Sources.Plex
* `authentik.sources.saml` - authentik Sources.SAML
* `authentik.stages.authenticator` - authentik Stages.Authenticator
* `authentik.stages.authenticator_duo` - authentik Stages.Authenticator.Duo
* `authentik.stages.authenticator_sms` - authentik Stages.Authenticator.SMS
* `authentik.stages.authenticator_static` - authentik Stages.Authenticator.Static
@ -29550,7 +29553,9 @@ components:
* `authentik_stages_authenticator_sms.authenticatorsmsstage` - SMS Authenticator Setup Stage
* `authentik_stages_authenticator_sms.smsdevice` - SMS Device
* `authentik_stages_authenticator_static.authenticatorstaticstage` - Static Authenticator Stage
* `authentik_stages_authenticator_static.staticdevice` - Static device
* `authentik_stages_authenticator_totp.authenticatortotpstage` - TOTP Authenticator Setup Stage
* `authentik_stages_authenticator_totp.totpdevice` - TOTP device
* `authentik_stages_authenticator_validate.authenticatorvalidatestage` - Authenticator Validation Stage
* `authentik_stages_authenticator_webauthn.authenticatewebauthnstage` - WebAuthn Authenticator Setup Stage
* `authentik_stages_authenticator_webauthn.webauthndevice` - WebAuthn Device
@ -29666,6 +29671,7 @@ components:
* `authentik.sources.oauth` - authentik Sources.OAuth
* `authentik.sources.plex` - authentik Sources.Plex
* `authentik.sources.saml` - authentik Sources.SAML
* `authentik.stages.authenticator` - authentik Stages.Authenticator
* `authentik.stages.authenticator_duo` - authentik Stages.Authenticator.Duo
* `authentik.stages.authenticator_sms` - authentik Stages.Authenticator.SMS
* `authentik.stages.authenticator_static` - authentik Stages.Authenticator.Static
@ -29740,7 +29746,9 @@ components:
* `authentik_stages_authenticator_sms.authenticatorsmsstage` - SMS Authenticator Setup Stage
* `authentik_stages_authenticator_sms.smsdevice` - SMS Device
* `authentik_stages_authenticator_static.authenticatorstaticstage` - Static Authenticator Stage
* `authentik_stages_authenticator_static.staticdevice` - Static device
* `authentik_stages_authenticator_totp.authenticatortotpstage` - TOTP Authenticator Setup Stage
* `authentik_stages_authenticator_totp.totpdevice` - TOTP device
* `authentik_stages_authenticator_validate.authenticatorvalidatestage` - Authenticator Validation Stage
* `authentik_stages_authenticator_webauthn.authenticatewebauthnstage` - WebAuthn Authenticator Setup Stage
* `authentik_stages_authenticator_webauthn.webauthndevice` - WebAuthn Device
@ -31898,7 +31906,9 @@ components:
- authentik_stages_authenticator_sms.authenticatorsmsstage
- authentik_stages_authenticator_sms.smsdevice
- authentik_stages_authenticator_static.authenticatorstaticstage
- authentik_stages_authenticator_static.staticdevice
- authentik_stages_authenticator_totp.authenticatortotpstage
- authentik_stages_authenticator_totp.totpdevice
- authentik_stages_authenticator_validate.authenticatorvalidatestage
- authentik_stages_authenticator_webauthn.authenticatewebauthnstage
- authentik_stages_authenticator_webauthn.webauthndevice
@ -31970,7 +31980,9 @@ components:
* `authentik_stages_authenticator_sms.authenticatorsmsstage` - SMS Authenticator Setup Stage
* `authentik_stages_authenticator_sms.smsdevice` - SMS Device
* `authentik_stages_authenticator_static.authenticatorstaticstage` - Static Authenticator Stage
* `authentik_stages_authenticator_static.staticdevice` - Static device
* `authentik_stages_authenticator_totp.authenticatortotpstage` - TOTP Authenticator Setup Stage
* `authentik_stages_authenticator_totp.totpdevice` - TOTP device
* `authentik_stages_authenticator_validate.authenticatorvalidatestage` - Authenticator Validation Stage
* `authentik_stages_authenticator_webauthn.authenticatewebauthnstage` - WebAuthn Authenticator Setup Stage
* `authentik_stages_authenticator_webauthn.webauthndevice` - WebAuthn Device
@ -34883,6 +34895,7 @@ components:
* `authentik.sources.oauth` - authentik Sources.OAuth
* `authentik.sources.plex` - authentik Sources.Plex
* `authentik.sources.saml` - authentik Sources.SAML
* `authentik.stages.authenticator` - authentik Stages.Authenticator
* `authentik.stages.authenticator_duo` - authentik Stages.Authenticator.Duo
* `authentik.stages.authenticator_sms` - authentik Stages.Authenticator.SMS
* `authentik.stages.authenticator_static` - authentik Stages.Authenticator.Static
@ -34957,7 +34970,9 @@ components:
* `authentik_stages_authenticator_sms.authenticatorsmsstage` - SMS Authenticator Setup Stage
* `authentik_stages_authenticator_sms.smsdevice` - SMS Device
* `authentik_stages_authenticator_static.authenticatorstaticstage` - Static Authenticator Stage
* `authentik_stages_authenticator_static.staticdevice` - Static device
* `authentik_stages_authenticator_totp.authenticatortotpstage` - TOTP Authenticator Setup Stage
* `authentik_stages_authenticator_totp.totpdevice` - TOTP device
* `authentik_stages_authenticator_validate.authenticatorvalidatestage` - Authenticator Validation Stage
* `authentik_stages_authenticator_webauthn.authenticatewebauthnstage` - WebAuthn Authenticator Setup Stage
* `authentik_stages_authenticator_webauthn.webauthndevice` - WebAuthn Device

View file

@ -3,9 +3,6 @@ from base64 import b32decode
from time import sleep
from urllib.parse import parse_qs, urlparse
from django_otp.oath import TOTP
from django_otp.plugins.otp_static.models import StaticDevice, StaticToken
from django_otp.plugins.otp_totp.models import TOTPDevice
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
@ -13,8 +10,13 @@ from selenium.webdriver.support.wait import WebDriverWait
from authentik.blueprints.tests import apply_blueprint
from authentik.flows.models import Flow
from authentik.stages.authenticator_static.models import AuthenticatorStaticStage
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage
from authentik.stages.authenticator.oath import TOTP
from authentik.stages.authenticator_static.models import (
AuthenticatorStaticStage,
StaticDevice,
StaticToken,
)
from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage, TOTPDevice
from tests.e2e.utils import SeleniumTestCase, retry

View file

@ -46,11 +46,11 @@ export class UserDeviceList extends MFADevicesPage {
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsAdminSmsDestroy({
id: device.pk,
});
case "otp_totp.TOTPDevice":
case "authentik_stages_authenticator_totp.TOTPDevice":
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsAdminTotpDestroy({
id: device.pk,
});
case "otp_static.StaticDevice":
case "authentik_stages_authenticator_static.StaticDevice":
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsAdminStaticDestroy({
id: device.pk,
});

View file

@ -39,13 +39,13 @@ export class MFADeviceForm extends ModelForm<Device, number> {
sMSDeviceRequest: device,
});
break;
case "otp_totp.TOTPDevice":
case "authentik_stages_authenticator_totp.TOTPDevice":
await new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsTotpUpdate({
id: this.instance?.pk,
tOTPDeviceRequest: device,
});
break;
case "otp_static.StaticDevice":
case "authentik_stages_authenticator_static.StaticDevice":
await new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsStaticUpdate({
id: this.instance?.pk,
staticDeviceRequest: device,

View file

@ -24,9 +24,9 @@ export function stageToAuthenticatorName(stage: UserSetting): string {
export function deviceTypeName(device: Device): string {
switch (device.type) {
case "otp_static.StaticDevice":
case "authentik_stages_authenticator_static.StaticDevice":
return msg("Static tokens");
case "otp_totp.TOTPDevice":
case "authentik_stages_authenticator_totp.TOTPDevice":
return msg("TOTP Device");
default:
return device.verboseName;
@ -102,11 +102,11 @@ export class MFADevicesPage extends Table<Device> {
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsSmsDestroy({
id: device.pk,
});
case "otp_totp.TOTPDevice":
case "authentik_stages_authenticator_totp.TOTPDevice":
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsTotpDestroy({
id: device.pk,
});
case "otp_static.StaticDevice":
case "authentik_stages_authenticator_static.StaticDevice":
return new AuthenticatorsApi(DEFAULT_CONFIG).authenticatorsStaticDestroy({
id: device.pk,
});