114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
|
"""passbook OAuth2 Token Introspection Views"""
|
||
|
from dataclasses import InitVar, dataclass
|
||
|
from typing import Optional
|
||
|
|
||
|
from django.http import HttpRequest, HttpResponse
|
||
|
from django.views import View
|
||
|
from structlog import get_logger
|
||
|
|
||
|
from passbook.providers.oauth2.constants import SCOPE_OPENID_INTROSPECTION
|
||
|
from passbook.providers.oauth2.errors import TokenIntrospectionError
|
||
|
from passbook.providers.oauth2.models import IDToken, OAuth2Provider, RefreshToken
|
||
|
from passbook.providers.oauth2.utils import TokenResponse, extract_client_auth
|
||
|
|
||
|
LOGGER = get_logger()
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class TokenIntrospectionParams:
|
||
|
"""Parameters for Token Introspection"""
|
||
|
|
||
|
client_id: str
|
||
|
client_secret: str
|
||
|
|
||
|
raw_token: InitVar[str]
|
||
|
|
||
|
token: Optional[RefreshToken] = None
|
||
|
|
||
|
provider: Optional[OAuth2Provider] = None
|
||
|
id_token: Optional[IDToken] = None
|
||
|
|
||
|
def __post_init__(self, raw_token: str):
|
||
|
try:
|
||
|
self.token = RefreshToken.objects.get(access_token=raw_token)
|
||
|
except RefreshToken.DoesNotExist:
|
||
|
LOGGER.debug("Token does not exist", token=raw_token)
|
||
|
raise TokenIntrospectionError()
|
||
|
if self.token.has_expired():
|
||
|
LOGGER.debug("Token is not valid", token=raw_token)
|
||
|
raise TokenIntrospectionError()
|
||
|
try:
|
||
|
self.provider = OAuth2Provider.objects.get(
|
||
|
client_id=self.client_id, client_secret=self.client_secret,
|
||
|
)
|
||
|
except OAuth2Provider.DoesNotExist:
|
||
|
LOGGER.debug("provider for ID not found", client_id=self.client_id)
|
||
|
raise TokenIntrospectionError()
|
||
|
if SCOPE_OPENID_INTROSPECTION not in self.provider.scope_names:
|
||
|
LOGGER.debug(
|
||
|
"OAuth2Provider does not have introspection scope",
|
||
|
client_id=self.client_id,
|
||
|
)
|
||
|
raise TokenIntrospectionError()
|
||
|
|
||
|
self.id_token = self.token.id_token
|
||
|
|
||
|
if not self.token.id_token:
|
||
|
LOGGER.debug(
|
||
|
"token not an authentication token", token=self.token,
|
||
|
)
|
||
|
raise TokenIntrospectionError()
|
||
|
|
||
|
audience = self.token.id_token.aud
|
||
|
if not audience:
|
||
|
LOGGER.debug(
|
||
|
"No audience found for token", token=self.token,
|
||
|
)
|
||
|
raise TokenIntrospectionError()
|
||
|
|
||
|
if audience not in self.provider.scope_names:
|
||
|
LOGGER.debug(
|
||
|
"provider does not audience scope",
|
||
|
client_id=self.client_id,
|
||
|
audience=audience,
|
||
|
)
|
||
|
raise TokenIntrospectionError()
|
||
|
|
||
|
@staticmethod
|
||
|
def from_request(request: HttpRequest) -> "TokenIntrospectionParams":
|
||
|
"""Extract required Parameters from HTTP Request"""
|
||
|
# Introspection only supports POST requests
|
||
|
client_id, client_secret = extract_client_auth(request)
|
||
|
return TokenIntrospectionParams(
|
||
|
raw_token=request.POST.get("token"),
|
||
|
client_id=client_id,
|
||
|
client_secret=client_secret,
|
||
|
)
|
||
|
|
||
|
|
||
|
class TokenIntrospectionView(View):
|
||
|
"""Token Introspection
|
||
|
https://tools.ietf.org/html/rfc7662"""
|
||
|
|
||
|
token: RefreshToken
|
||
|
params: TokenIntrospectionParams
|
||
|
provider: OAuth2Provider
|
||
|
id_token: IDToken
|
||
|
|
||
|
def post(self, request: HttpRequest) -> HttpResponse:
|
||
|
"""Introspection handler"""
|
||
|
self.params = TokenIntrospectionParams.from_request(request)
|
||
|
|
||
|
try:
|
||
|
response_dic = {}
|
||
|
if self.id_token:
|
||
|
token_dict = self.id_token.to_dict()
|
||
|
for k in ("aud", "sub", "exp", "iat", "iss"):
|
||
|
response_dic[k] = token_dict[k]
|
||
|
response_dic["active"] = True
|
||
|
response_dic["client_id"] = self.token.provider.client_id
|
||
|
|
||
|
return TokenResponse(response_dic)
|
||
|
except TokenIntrospectionError:
|
||
|
return TokenResponse({"active": False})
|