stages/user_write: add tests for duplicate data

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-06-07 16:09:56 +02:00
parent 0f693158b6
commit f23111beff
2 changed files with 52 additions and 4 deletions

View file

@ -2,6 +2,7 @@
from django.contrib import messages from django.contrib import messages
from django.contrib.auth import update_session_auth_hash from django.contrib.auth import update_session_auth_hash
from django.contrib.auth.backends import ModelBackend from django.contrib.auth.backends import ModelBackend
from django.db import transaction
from django.db.utils import IntegrityError from django.db.utils import IntegrityError
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
@ -86,10 +87,11 @@ class UserWriteStageView(StageView):
] ]
user.attributes[USER_ATTRIBUTE_SOURCES].append(connection.source.name) user.attributes[USER_ATTRIBUTE_SOURCES].append(connection.source.name)
try: try:
user.save() with transaction.atomic():
user.save()
except IntegrityError as exc: except IntegrityError as exc:
LOGGER.warning("Failed to save user", exc=exc) LOGGER.warning("Failed to save user", exc=exc)
self.executor.stage_invalid() return self.executor.stage_invalid()
user_write.send( user_write.send(
sender=self, request=request, user=user, data=data, created=user_created sender=self, request=request, user=user, data=data, created=user_created
) )

View file

@ -7,7 +7,13 @@ from django.test import Client, TestCase
from django.urls import reverse from django.urls import reverse
from django.utils.encoding import force_str from django.utils.encoding import force_str
from authentik.core.models import User from authentik.core.models import (
USER_ATTRIBUTE_SOURCES,
Source,
User,
UserSourceConnection,
)
from authentik.core.sources.stage import PLAN_CONTEXT_SOURCES_CONNECTION
from authentik.flows.challenge import ChallengeTypes from authentik.flows.challenge import ChallengeTypes
from authentik.flows.markers import StageMarker from authentik.flows.markers import StageMarker
from authentik.flows.models import Flow, FlowDesignation, FlowStageBinding from authentik.flows.models import Flow, FlowDesignation, FlowStageBinding
@ -32,6 +38,7 @@ class TestUserWriteStage(TestCase):
) )
self.stage = UserWriteStage.objects.create(name="write") self.stage = UserWriteStage.objects.create(name="write")
FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
self.source = Source.objects.create(name="fake_source")
def test_user_create(self): def test_user_create(self):
"""Test creation of user""" """Test creation of user"""
@ -49,6 +56,9 @@ class TestUserWriteStage(TestCase):
"email": "test@beryju.org", "email": "test@beryju.org",
"password": password, "password": password,
} }
plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(
source=self.source
)
session = self.client.session session = self.client.session
session[SESSION_KEY_PLAN] = plan session[SESSION_KEY_PLAN] = plan
session.save() session.save()
@ -71,6 +81,9 @@ class TestUserWriteStage(TestCase):
) )
self.assertTrue(user_qs.exists()) self.assertTrue(user_qs.exists())
self.assertTrue(user_qs.first().check_password(password)) self.assertTrue(user_qs.first().check_password(password))
self.assertEqual(
user_qs.first().attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]}
)
def test_user_update(self): def test_user_update(self):
"""Test update of existing user""" """Test update of existing user"""
@ -147,7 +160,7 @@ class TestUserWriteStage(TestCase):
"authentik.flows.views.to_stage_response", "authentik.flows.views.to_stage_response",
TO_STAGE_RESPONSE_MOCK, TO_STAGE_RESPONSE_MOCK,
) )
def test_with_blank_username(self): def test_blank_username(self):
"""Test with blank username results in error""" """Test with blank username results in error"""
plan = FlowPlan( plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()] flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
@ -175,3 +188,36 @@ class TestUserWriteStage(TestCase):
"type": ChallengeTypes.NATIVE.value, "type": ChallengeTypes.NATIVE.value,
}, },
) )
@patch(
"authentik.flows.views.to_stage_response",
TO_STAGE_RESPONSE_MOCK,
)
def test_duplicate_data(self):
"""Test with duplicate data, should trigger error"""
plan = FlowPlan(
flow_pk=self.flow.pk.hex, stages=[self.stage], markers=[StageMarker()]
)
session = self.client.session
plan.context[PLAN_CONTEXT_PROMPT] = {
"username": "akadmin",
"attribute_some-custom-attribute": "test",
"some_ignored_attribute": "bar",
}
session[SESSION_KEY_PLAN] = plan
session.save()
response = self.client.get(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
)
self.assertEqual(response.status_code, 200)
self.assertJSONEqual(
force_str(response.content),
{
"component": "ak-stage-access-denied",
"error_message": None,
"title": "",
"type": ChallengeTypes.NATIVE.value,
},
)