api: ensure user is active when authenticating
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
parent
a5e84b5482
commit
c9c059a008
|
@ -31,6 +31,16 @@ def validate_auth(header: bytes) -> Optional[str]:
|
|||
|
||||
|
||||
def bearer_auth(raw_header: bytes) -> Optional[User]:
|
||||
"""raw_header in the Format of `Bearer ....`"""
|
||||
user = auth_user_lookup(raw_header)
|
||||
if not user:
|
||||
return None
|
||||
if not user.is_active:
|
||||
raise AuthenticationFailed("Token invalid/expired")
|
||||
return user
|
||||
|
||||
|
||||
def auth_user_lookup(raw_header: bytes) -> Optional[User]:
|
||||
"""raw_header in the Format of `Bearer ....`"""
|
||||
from authentik.providers.oauth2.models import RefreshToken
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ from rest_framework.exceptions import AuthenticationFailed
|
|||
from authentik.api.authentication import bearer_auth
|
||||
from authentik.blueprints.tests import reconcile_app
|
||||
from authentik.core.models import USER_ATTRIBUTE_SA, Token, TokenIntents
|
||||
from authentik.core.tests.utils import create_test_flow
|
||||
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
|
||||
from authentik.lib.generators import generate_id
|
||||
from authentik.providers.oauth2.constants import SCOPE_AUTHENTIK_API
|
||||
from authentik.providers.oauth2.models import OAuth2Provider, RefreshToken
|
||||
|
@ -36,9 +36,18 @@ class TestAPIAuth(TestCase):
|
|||
|
||||
def test_bearer_valid(self):
|
||||
"""Test valid token"""
|
||||
token = Token.objects.create(intent=TokenIntents.INTENT_API, user=get_anonymous_user())
|
||||
token = Token.objects.create(intent=TokenIntents.INTENT_API, user=create_test_admin_user())
|
||||
self.assertEqual(bearer_auth(f"Bearer {token.key}".encode()), token.user)
|
||||
|
||||
def test_bearer_valid_deactivated(self):
|
||||
"""Test valid token"""
|
||||
user = create_test_admin_user()
|
||||
user.is_active = False
|
||||
user.save()
|
||||
token = Token.objects.create(intent=TokenIntents.INTENT_API, user=user)
|
||||
with self.assertRaises(AuthenticationFailed):
|
||||
bearer_auth(f"Bearer {token.key}".encode())
|
||||
|
||||
def test_managed_outpost(self):
|
||||
"""Test managed outpost"""
|
||||
with self.assertRaises(AuthenticationFailed):
|
||||
|
@ -56,7 +65,7 @@ class TestAPIAuth(TestCase):
|
|||
name=generate_id(), client_id=generate_id(), authorization_flow=create_test_flow()
|
||||
)
|
||||
refresh = RefreshToken.objects.create(
|
||||
user=get_anonymous_user(),
|
||||
user=create_test_admin_user(),
|
||||
provider=provider,
|
||||
refresh_token=generate_id(),
|
||||
_scope=SCOPE_AUTHENTIK_API,
|
||||
|
@ -69,7 +78,7 @@ class TestAPIAuth(TestCase):
|
|||
name=generate_id(), client_id=generate_id(), authorization_flow=create_test_flow()
|
||||
)
|
||||
refresh = RefreshToken.objects.create(
|
||||
user=get_anonymous_user(),
|
||||
user=create_test_admin_user(),
|
||||
provider=provider,
|
||||
refresh_token=generate_id(),
|
||||
_scope="",
|
||||
|
|
Reference in a new issue