sources/oauth: fix name clash (#7253)

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L 2023-10-21 20:29:36 +02:00 committed by GitHub
parent 52a452488c
commit 616f0b8b4f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 23 additions and 24 deletions

View file

@ -54,7 +54,7 @@ class OAuthSourceSerializer(SourceSerializer):
@extend_schema_field(SourceTypeSerializer) @extend_schema_field(SourceTypeSerializer)
def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
"""Get source's type configuration""" """Get source's type configuration"""
return SourceTypeSerializer(instance.type).data return SourceTypeSerializer(instance.source_type).data
def validate(self, attrs: dict) -> dict: def validate(self, attrs: dict) -> dict:
session = get_http_session() session = get_http_session()

View file

@ -36,8 +36,8 @@ class BaseOAuthClient:
def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]: def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]:
"""Fetch user profile information.""" """Fetch user profile information."""
profile_url = self.source.type.profile_url or "" profile_url = self.source.source_type.profile_url or ""
if self.source.type.urls_customizable and self.source.profile_url: if self.source.source_type.urls_customizable and self.source.profile_url:
profile_url = self.source.profile_url profile_url = self.source.profile_url
response = self.do_request("get", profile_url, token=token) response = self.do_request("get", profile_url, token=token)
try: try:
@ -57,8 +57,8 @@ class BaseOAuthClient:
def get_redirect_url(self, parameters=None): def get_redirect_url(self, parameters=None):
"""Build authentication redirect url.""" """Build authentication redirect url."""
authorization_url = self.source.type.authorization_url or "" authorization_url = self.source.source_type.authorization_url or ""
if self.source.type.urls_customizable and self.source.authorization_url: if self.source.source_type.urls_customizable and self.source.authorization_url:
authorization_url = self.source.authorization_url authorization_url = self.source.authorization_url
if authorization_url == "": if authorization_url == "":
Event.new( Event.new(

View file

@ -28,8 +28,8 @@ class OAuthClient(BaseOAuthClient):
if raw_token is not None and verifier is not None: if raw_token is not None and verifier is not None:
token = self.parse_raw_token(raw_token) token = self.parse_raw_token(raw_token)
try: try:
access_token_url = self.source.type.access_token_url or "" access_token_url = self.source.source_type.access_token_url or ""
if self.source.type.urls_customizable and self.source.access_token_url: if self.source.source_type.urls_customizable and self.source.access_token_url:
access_token_url = self.source.access_token_url access_token_url = self.source.access_token_url
response = self.do_request( response = self.do_request(
"post", "post",
@ -54,8 +54,8 @@ class OAuthClient(BaseOAuthClient):
"""Fetch the OAuth request token. Only required for OAuth 1.0.""" """Fetch the OAuth request token. Only required for OAuth 1.0."""
callback = self.request.build_absolute_uri(self.callback) callback = self.request.build_absolute_uri(self.callback)
try: try:
request_token_url = self.source.type.request_token_url or "" request_token_url = self.source.source_type.request_token_url or ""
if self.source.type.urls_customizable and self.source.request_token_url: if self.source.source_type.urls_customizable and self.source.request_token_url:
request_token_url = self.source.request_token_url request_token_url = self.source.request_token_url
response = self.do_request( response = self.do_request(
"post", "post",

View file

@ -76,8 +76,8 @@ class OAuth2Client(BaseOAuthClient):
if SESSION_KEY_OAUTH_PKCE in self.request.session: if SESSION_KEY_OAUTH_PKCE in self.request.session:
args["code_verifier"] = self.request.session[SESSION_KEY_OAUTH_PKCE] args["code_verifier"] = self.request.session[SESSION_KEY_OAUTH_PKCE]
try: try:
access_token_url = self.source.type.access_token_url or "" access_token_url = self.source.source_type.access_token_url or ""
if self.source.type.urls_customizable and self.source.access_token_url: if self.source.source_type.urls_customizable and self.source.access_token_url:
access_token_url = self.source.access_token_url access_token_url = self.source.access_token_url
response = self.session.request( response = self.session.request(
"post", access_token_url, data=args, headers=self._default_headers, **request_kwargs "post", access_token_url, data=args, headers=self._default_headers, **request_kwargs
@ -140,8 +140,8 @@ class UserprofileHeaderAuthClient(OAuth2Client):
def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]: def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]:
"Fetch user profile information." "Fetch user profile information."
profile_url = self.source.type.profile_url or "" profile_url = self.source.source_type.profile_url or ""
if self.source.type.urls_customizable and self.source.profile_url: if self.source.source_type.urls_customizable and self.source.profile_url:
profile_url = self.source.profile_url profile_url = self.source.profile_url
response = self.session.request( response = self.session.request(
"get", "get",

View file

@ -1,5 +1,5 @@
"""OAuth Client models""" """OAuth Client models"""
from typing import TYPE_CHECKING, Optional, Type from typing import TYPE_CHECKING, Optional
from django.db import models from django.db import models
from django.http.request import HttpRequest from django.http.request import HttpRequest
@ -55,7 +55,7 @@ class OAuthSource(Source):
oidc_jwks = models.JSONField(default=dict, blank=True) oidc_jwks = models.JSONField(default=dict, blank=True)
@property @property
def type(self) -> type["SourceType"]: def source_type(self) -> type["SourceType"]:
"""Return the provider instance for this source""" """Return the provider instance for this source"""
from authentik.sources.oauth.types.registry import registry from authentik.sources.oauth.types.registry import registry
@ -65,15 +65,14 @@ class OAuthSource(Source):
def component(self) -> str: def component(self) -> str:
return "ak-source-oauth-form" return "ak-source-oauth-form"
# we're using Type[] instead of type[] here since type[] interferes with the property above
@property @property
def serializer(self) -> Type[Serializer]: def serializer(self) -> type[Serializer]:
from authentik.sources.oauth.api.source import OAuthSourceSerializer from authentik.sources.oauth.api.source import OAuthSourceSerializer
return OAuthSourceSerializer return OAuthSourceSerializer
def ui_login_button(self, request: HttpRequest) -> UILoginButton: def ui_login_button(self, request: HttpRequest) -> UILoginButton:
provider_type = self.type provider_type = self.source_type
provider = provider_type() provider = provider_type()
icon = self.get_icon icon = self.get_icon
if not icon: if not icon:
@ -85,7 +84,7 @@ class OAuthSource(Source):
) )
def ui_user_settings(self) -> Optional[UserSettingSerializer]: def ui_user_settings(self) -> Optional[UserSettingSerializer]:
provider_type = self.type provider_type = self.source_type
icon = self.get_icon icon = self.get_icon
if not icon: if not icon:
icon = provider_type().icon_url() icon = provider_type().icon_url()

View file

@ -23,8 +23,8 @@ class GitHubOAuth2Client(OAuth2Client):
def get_github_emails(self, token: dict[str, str]) -> list[dict[str, Any]]: def get_github_emails(self, token: dict[str, str]) -> list[dict[str, Any]]:
"""Get Emails from the GitHub API""" """Get Emails from the GitHub API"""
profile_url = self.source.type.profile_url or "" profile_url = self.source.source_type.profile_url or ""
if self.source.type.urls_customizable and self.source.profile_url: if self.source.source_type.urls_customizable and self.source.profile_url:
profile_url = self.source.profile_url profile_url = self.source.profile_url
profile_url += "/emails" profile_url += "/emails"
response = self.do_request("get", profile_url, token=token) response = self.do_request("get", profile_url, token=token)

View file

@ -26,8 +26,8 @@ class MailcowOAuth2Client(OAuth2Client):
def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]: def get_profile_info(self, token: dict[str, str]) -> Optional[dict[str, Any]]:
"Fetch user profile information." "Fetch user profile information."
profile_url = self.source.type.profile_url or "" profile_url = self.source.source_type.profile_url or ""
if self.source.type.urls_customizable and self.source.profile_url: if self.source.source_type.urls_customizable and self.source.profile_url:
profile_url = self.source.profile_url profile_url = self.source.profile_url
response = self.session.request( response = self.session.request(
"get", "get",

View file

@ -25,7 +25,7 @@ class OAuthClientMixin:
if self.client_class is not None: if self.client_class is not None:
# pylint: disable=not-callable # pylint: disable=not-callable
return self.client_class(source, self.request, **kwargs) return self.client_class(source, self.request, **kwargs)
if source.type.request_token_url or source.request_token_url: if source.source_type.request_token_url or source.request_token_url:
client = OAuthClient(source, self.request, **kwargs) client = OAuthClient(source, self.request, **kwargs)
else: else:
client = OAuth2Client(source, self.request, **kwargs) client = OAuth2Client(source, self.request, **kwargs)