"""passbook OAuth2 OpenID Userinfo views""" from typing import Any, Dict, List from django.http import HttpRequest, HttpResponse from django.utils.translation import ugettext_lazy as _ from django.views import View from structlog import get_logger from passbook.providers.oauth2.constants import ( SCOPE_GITHUB_ORG_READ, SCOPE_GITHUB_USER, SCOPE_GITHUB_USER_EMAIL, SCOPE_GITHUB_USER_READ, ) from passbook.providers.oauth2.models import RefreshToken, ScopeMapping from passbook.providers.oauth2.utils import TokenResponse, cors_allow_any LOGGER = get_logger() class UserInfoView(View): """Create a dictionary with all the requested claims about the End-User. See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse""" def get_scope_descriptions(self, scopes: List[str]) -> List[str]: """Get a list of all Scopes's descriptions""" scope_descriptions = [] for scope in ScopeMapping.objects.filter(scope_name__in=scopes).order_by( "scope_name" ): if scope.description != "": scope_descriptions.append(scope.description) # GitHub Compatibility Scopes are handeled differently, since they required custom paths # Hence they don't exist as Scope objects github_scope_map = { SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"), SCOPE_GITHUB_USER_READ: _( "GitHub Compatibility: Access your User Information" ), SCOPE_GITHUB_USER_EMAIL: _( "GitHub Compatibility: Access you Email addresses" ), SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"), } for scope in scopes: if scope in github_scope_map: scope_descriptions.append(github_scope_map[scope]) return scope_descriptions def get_claims(self, token: RefreshToken) -> Dict[str, Any]: """Get a dictionary of claims from scopes that the token requires and are assigned to the provider.""" scopes_from_client = token.scope final_claims = {} for scope in ScopeMapping.objects.filter( provider=token.provider, scope_name__in=scopes_from_client ).order_by("scope_name"): value = scope.evaluate( user=token.user, request=self.request, provider=token.provider, token=token, ) if value is None: continue if not isinstance(value, dict): LOGGER.warning( "Scope returned a non-dict value, ignoring", scope=scope, value=value, ) continue LOGGER.debug("updated scope", scope=scope) final_claims.update(value) return final_claims def options(self, request: HttpRequest) -> HttpResponse: return cors_allow_any(self.request, TokenResponse({})) def get(self, request: HttpRequest, **kwargs) -> HttpResponse: """Handle GET Requests for UserInfo""" token: RefreshToken = kwargs["token"] claims = self.get_claims(token) claims["sub"] = token.id_token.sub response = TokenResponse(claims) cors_allow_any(self.request, response) return response def post(self, request: HttpRequest, **kwargs) -> HttpResponse: """POST Requests behave the same as GET Requests, so the get handler is called here""" return self.get(request, **kwargs)