diff --git a/authentik/providers/oauth2/tests/test_authorize.py b/authentik/providers/oauth2/tests/test_authorize.py index 9f0aebb4e..128f3fa15 100644 --- a/authentik/providers/oauth2/tests/test_authorize.py +++ b/authentik/providers/oauth2/tests/test_authorize.py @@ -322,18 +322,18 @@ class TestAuthorize(OAuthTestCase): ) self.validate_jwt(token, provider) - def test_full_form_post(self): + def test_full_form_post_id_token(self): """Test full authorization (form_post response)""" flow = create_test_flow() provider = OAuth2Provider.objects.create( name=generate_id(), - client_id="test", + client_id=generate_id(), client_secret=generate_key(), authorization_flow=flow, redirect_uris="http://localhost", signing_key=self.keypair, ) - Application.objects.create(name="app", slug="app", provider=provider) + app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) state = generate_id() user = create_test_admin_user() self.client.force_login(user) @@ -343,7 +343,7 @@ class TestAuthorize(OAuthTestCase): data={ "response_type": "id_token", "response_mode": "form_post", - "client_id": "test", + "client_id": provider.client_id, "state": state, "scope": "openid", "redirect_uri": "http://localhost", @@ -359,7 +359,7 @@ class TestAuthorize(OAuthTestCase): "component": "ak-stage-autosubmit", "type": ChallengeTypes.NATIVE.value, "url": "http://localhost", - "title": "Redirecting to app...", + "title": f"Redirecting to {app.name}...", "attrs": { "access_token": token.access_token, "id_token": provider.encode(token.id_token.to_dict()), @@ -370,3 +370,48 @@ class TestAuthorize(OAuthTestCase): }, ) self.validate_jwt(token, provider) + + def test_full_form_post_code(self): + """Test full authorization (form_post response, code type)""" + flow = create_test_flow() + provider = OAuth2Provider.objects.create( + name=generate_id(), + client_id=generate_id(), + client_secret=generate_key(), + authorization_flow=flow, + redirect_uris="http://localhost", + signing_key=self.keypair, + ) + app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) + state = generate_id() + user = create_test_admin_user() + self.client.force_login(user) + # Step 1, initiate params and get redirect to flow + self.client.get( + reverse("authentik_providers_oauth2:authorize"), + data={ + "response_type": "code", + "response_mode": "form_post", + "client_id": provider.client_id, + "state": state, + "scope": "openid", + "redirect_uri": "http://localhost", + }, + ) + response = self.client.get( + reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), + ) + code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() + self.assertJSONEqual( + response.content.decode(), + { + "component": "ak-stage-autosubmit", + "type": ChallengeTypes.NATIVE.value, + "url": "http://localhost", + "title": f"Redirecting to {app.name}...", + "attrs": { + "code": code.code, + "state": state, + }, + }, + ) diff --git a/authentik/providers/oauth2/views/authorize.py b/authentik/providers/oauth2/views/authorize.py index aeeb6d82a..1a2a9851b 100644 --- a/authentik/providers/oauth2/views/authorize.py +++ b/authentik/providers/oauth2/views/authorize.py @@ -465,7 +465,6 @@ class OAuthFulfillmentStage(StageView): def create_response_uri(self) -> str: """Create a final Response URI the user is redirected to.""" uri = urlsplit(self.params.redirect_uri) - query_params = parse_qs(uri.query) try: code = None @@ -478,6 +477,7 @@ class OAuthFulfillmentStage(StageView): code.save(force_insert=True) if self.params.response_mode == ResponseMode.QUERY: + query_params = parse_qs(uri.query) query_params["code"] = code.code query_params["state"] = [str(self.params.state) if self.params.state else ""] @@ -494,7 +494,12 @@ class OAuthFulfillmentStage(StageView): return urlunsplit(uri) if self.params.response_mode == ResponseMode.FORM_POST: - post_params = self.create_implicit_response(code) + post_params = {} + if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: + post_params["code"] = code.code + post_params["state"] = [str(self.params.state) if self.params.state else ""] + else: + post_params = self.create_implicit_response(code) uri = uri._replace(query=urlencode(post_params, doseq=True))