From 764282ea9ee3a672306c1bce57052c2ee85c64b3 Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Sun, 16 Dec 2018 17:09:26 +0100 Subject: [PATCH] saml_idp: Rewrite to CBV --- passbook/core/forms/authentication.py | 7 - passbook/lib/mixins.py | 12 ++ passbook/saml_idp/urls.py | 9 +- passbook/saml_idp/views.py | 255 ++++++++++++++------------ 4 files changed, 150 insertions(+), 133 deletions(-) create mode 100644 passbook/lib/mixins.py diff --git a/passbook/core/forms/authentication.py b/passbook/core/forms/authentication.py index febe59e9e..522f90c55 100644 --- a/passbook/core/forms/authentication.py +++ b/passbook/core/forms/authentication.py @@ -47,10 +47,6 @@ class SignUpForm(forms.Form): widget=forms.PasswordInput(attrs={ 'placeholder': _('Repeat Password') })) - # captcha = ReCaptchaField( - # required=(not settings.DEBUG and not settings.TEST), - # private_key=Setting.get('recaptcha:private'), - # public_key=Setting.get('recaptcha:public')) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -58,9 +54,6 @@ class SignUpForm(forms.Form): if 'initial' in kwargs: for field in kwargs.get('initial').keys(): self.fields[field].widget.attrs['readonly'] = 'readonly' - # TODO: Dynamically add captcha here - # if not Setting.get_bool('recaptcha:enabled'): - # self.fields.pop('captcha') def clean_username(self): """Check if username is used already""" diff --git a/passbook/lib/mixins.py b/passbook/lib/mixins.py new file mode 100644 index 000000000..c83b0676a --- /dev/null +++ b/passbook/lib/mixins.py @@ -0,0 +1,12 @@ +"""passbook util mixins""" +from django.utils.decorators import method_decorator +from django.views.decorators.csrf import csrf_exempt + + +class CSRFExemptMixin: + """wrapper to apply @csrf_exempt to CBV""" + + @method_decorator(csrf_exempt) + def dispatch(self, *args, **kwargs): + """wrapper to apply @csrf_exempt to CBV""" + return super().dispatch(*args, **kwargs) diff --git a/passbook/saml_idp/urls.py b/passbook/saml_idp/urls.py index 00bfc91d6..051f8c490 100644 --- a/passbook/saml_idp/urls.py +++ b/passbook/saml_idp/urls.py @@ -4,9 +4,8 @@ from django.conf.urls import url from passbook.saml_idp import views urlpatterns = [ - url(r'^login/$', views.login_begin, name="saml_login_begin"), - url(r'^login/process/$', views.login_process, name='saml_login_process'), - url(r'^logout/$', views.logout, name="saml_logout"), - url(r'^metadata/xml/$', views.descriptor, name='metadata_xml'), - # url(r'^settings/$', views.IDPSettingsView.as_view(), name='admin_settings'), + url(r'^login/$', views.LoginBeginView.as_view(), name="saml_login_begin"), + url(r'^login/process/$', views.LoginProcessView.as_view(), name='saml_login_process'), + url(r'^logout/$', views.LogoutView.as_view(), name="saml_logout"), + url(r'^metadata/xml/$', views.DescriptorView.as_view(), name='metadata_xml'), ] diff --git a/passbook/saml_idp/views.py b/passbook/saml_idp/views.py index 2193bdce2..666f66c33 100644 --- a/passbook/saml_idp/views.py +++ b/passbook/saml_idp/views.py @@ -1,25 +1,25 @@ """passbook SAML IDP Views""" from logging import getLogger -from django.contrib import auth -from django.contrib.auth.decorators import login_required +from django.contrib.auth import logout +from django.contrib.auth.mixins import LoginRequiredMixin from django.core.exceptions import ValidationError from django.core.validators import URLValidator -from django.http import (HttpResponse, HttpResponseBadRequest, - HttpResponseRedirect) -from django.shortcuts import redirect, render -from django.urls import reverse +from django.http import HttpResponse, HttpResponseBadRequest +from django.shortcuts import get_object_or_404, redirect, render, reverse from django.utils.datastructures import MultiValueDictKeyError -# from django.utils.html import escape -# from django.utils.translation import ugettext as _ -from django.views.decorators.csrf import csrf_exempt +from django.views import View from passbook.lib.config import CONFIG +# from django.utils.html import escape +# from django.utils.translation import ugettext as _ +from passbook.lib.mixins import CSRFExemptMixin # from passbook.core.models import Event, Setting, UserAcquirableRelationship from passbook.lib.utils.template import render_to_string # from passbook.core.views.common import ErrorResponseView # from passbook.core.views.settings import GenericSettingView from passbook.saml_idp import exceptions, registry +from passbook.saml_idp.models import SAMLProvider # from OpenSSL.crypto import FILETYPE_PEM # from OpenSSL.crypto import Error as CryptoError @@ -47,144 +47,157 @@ def render_xml(request, template, ctx): return render(request, template, context=ctx, content_type="application/xml") -@csrf_exempt -def login_begin(request): +class LoginBeginView(CSRFExemptMixin, View): """Receives a SAML 2.0 AuthnRequest from a Service Provider and stores it in the session prior to enforcing login.""" - if request.method == 'POST': - source = request.POST - else: - source = request.GET - # Store these values now, because Django's login cycle won't preserve them. - try: - request.session['SAMLRequest'] = source['SAMLRequest'] - except (KeyError, MultiValueDictKeyError): - return HttpResponseBadRequest('the SAML request payload is missing') + def dispatch(self, request): + if request.method == 'POST': + source = request.POST + else: + source = request.GET + # Store these values now, because Django's login cycle won't preserve them. - request.session['RelayState'] = source.get('RelayState', '') - return redirect(reverse('passbook_saml_idp:saml_login_process')) + try: + request.session['SAMLRequest'] = source['SAMLRequest'] + except (KeyError, MultiValueDictKeyError): + return HttpResponseBadRequest('the SAML request payload is missing') + + request.session['RelayState'] = source.get('RelayState', '') + return redirect(reverse('passbook_saml_idp:saml_login_process')) -def redirect_to_sp(request, acs_url, saml_response, relay_state): +class RedirectToSPView(View): """Return autosubmit form""" - return render(request, 'core/autosubmit_form.html', { - 'url': acs_url, - 'attrs': { - 'SAMLResponse': saml_response, - 'RelayState': relay_state - } - }) + + def get(self, request, acs_url, saml_response, relay_state): + """Return autosubmit form""" + return render(request, 'core/autosubmit_form.html', { + 'url': acs_url, + 'attrs': { + 'SAMLResponse': saml_response, + 'RelayState': relay_state + } + }) -@login_required -def login_process(request): +class LoginProcessView(View): """Processor-based login continuation. Presents a SAML 2.0 Assertion for POSTing back to the Service Provider.""" - LOGGER.debug("Request: %s", request) - proc, remote = registry.find_processor(request) - # Check if user has access - access = True - # if remote.productextensionsaml2_set.exists() and \ - # remote.productextensionsaml2_set.first().product_set.exists(): - # # Only check if there is a connection from OAuth2 Application to product - # product = remote.productextensionsaml2_set.first().product_set.first() - # relationship = UserAcquirableRelationship.objects.filter(user=request.user, model=product) - # # Product is invitation_only = True and no relation with user exists - # if product.invitation_only and not relationship.exists(): - # access = False - # Check if we should just autosubmit - if remote.skip_authorization and access: - # full_res = _generate_response(request, proc, remote) - ctx = proc.generate_response() - # User accepted request - # Event.create( - # user=request.user, - # message=_('You authenticated %s (via SAML) (skipped Authz)' % remote.name), - # request=request, - # current=False, - # hidden=True) - return redirect_to_sp( - request=request, - acs_url=ctx['acs_url'], - saml_response=ctx['saml_response'], - relay_state=ctx['relay_state']) - if request.method == 'POST' and request.POST.get('ACSUrl', None) and access: - # User accepted request - # Event.create( - # user=request.user, - # message=_('You authenticated %s (via SAML)' % remote.name), - # request=request, - # current=False, - # hidden=True) - return redirect_to_sp( - request=request, - acs_url=request.POST.get('ACSUrl'), - saml_response=request.POST.get('SAMLResponse'), - relay_state=request.POST.get('RelayState')) - try: - full_res = _generate_response(request, proc, remote) - # if not access: - # LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product) - # messages.error(request, "You have no access to '%s'" % product.name) - # raise Http404 - return full_res - except exceptions.CannotHandleAssertion as exc: - LOGGER.debug(exc) - # return ErrorResponseView.as_view()(request, str(exc)) + + def dispatch(self, request): + LOGGER.debug("Request: %s", request) + proc, provider = registry.find_processor(request) + # Check if user has access + access = True + # if provider.productextensionsaml2_set.exists() and \ + # provider.productextensionsaml2_set.first().product_set.exists(): + # # Only check if there is a connection from OAuth2 Application to product + # product = provider.productextensionsaml2_set.first().product_set.first() + # relationship = UserAcquirableRelationship.objects. + # filter(user=request.user, model=product) + # # Product is invitation_only = True and no relation with user exists + # if product.invitation_only and not relationship.exists(): + # access = False + # Check if we should just autosubmit + if provider.skip_authorization and access: + # full_res = _generate_response(request, proc, provider) + ctx = proc.generate_response() + # User accepted request + # Event.create( + # user=request.user, + # message=_('You authenticated %s (via SAML) (skipped Authz)' % provider.name), + # request=request, + # current=False, + # hidden=True) + return RedirectToSPView.as_view()( + request=request, + acs_url=ctx['acs_url'], + saml_response=ctx['saml_response'], + relay_state=ctx['relay_state']) + if request.method == 'POST' and request.POST.get('ACSUrl', None) and access: + # User accepted request + # Event.create( + # user=request.user, + # message=_('You authenticated %s (via SAML)' % provider.name), + # request=request, + # current=False, + # hidden=True) + return RedirectToSPView.as_view()( + request=request, + acs_url=request.POST.get('ACSUrl'), + saml_response=request.POST.get('SAMLResponse'), + relay_state=request.POST.get('RelayState')) + try: + full_res = _generate_response(request, proc, provider) + # if not access: + # LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product) + # messages.error(request, "You have no access to '%s'" % product.name) + # raise Http404 + return full_res + except exceptions.CannotHandleAssertion as exc: + LOGGER.debug(exc) + # return ErrorResponseView.as_view()(request, str(exc)) -@csrf_exempt -def logout(request): +class LogoutView(CSRFExemptMixin, View): """Allows a non-SAML 2.0 URL to log out the user and returns a standard logged-out page. (SalesForce and others use this method, though it's technically not SAML 2.0).""" - auth.logout(request) - redirect_url = request.GET.get('redirect_to', '') + def get(self, request): + """Perform logout""" + logout(request) - try: - URL_VALIDATOR(redirect_url) - except ValidationError: - pass - else: - return HttpResponseRedirect(redirect_url) + redirect_url = request.GET.get('redirect_to', '') - return render(request, 'saml/idp/logged_out.html') + try: + URL_VALIDATOR(redirect_url) + except ValidationError: + pass + else: + return redirect(redirect_url) + + return render(request, 'saml/idp/logged_out.html') -@login_required -@csrf_exempt -def slo_logout(request): +class SLOLogout(CSRFExemptMixin, LoginRequiredMixin, View): """Receives a SAML 2.0 LogoutRequest from a Service Provider, logs out the user and returns a standard logged-out page.""" - request.session['SAMLRequest'] = request.POST['SAMLRequest'] - # TODO: Parse SAML LogoutRequest from POST data, similar to login_process(). - # TODO: Add a URL dispatch for this view. - # TODO: Modify the base processor to handle logouts? - # TODO: Combine this with login_process(), since they are so very similar? - # TODO: Format a LogoutResponse and return it to the browser. - # XXX: For now, simply log out without validating the request. - auth.logout(request) - return render(request, 'saml/idp/logged_out.html') + + def post(self, request): + """Perform logout""" + request.session['SAMLRequest'] = request.POST['SAMLRequest'] + # TODO: Parse SAML LogoutRequest from POST data, similar to login_process(). + # TODO: Add a URL dispatch for this view. + # TODO: Modify the base processor to handle logouts? + # TODO: Combine this with login_process(), since they are so very similar? + # TODO: Format a LogoutResponse and return it to the browser. + # XXX: For now, simply log out without validating the request. + logout(request) + return render(request, 'saml/idp/logged_out.html') -def descriptor(request): +class DescriptorView(View): """Replies with the XML Metadata IDSSODescriptor.""" - entity_id = CONFIG.y('saml_idp.issuer') - slo_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_logout')) - sso_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_login_begin')) - pubkey = '' # TODO: Extract application/provider for pubkey - ctx = { - 'entity_id': entity_id, - 'cert_public_key': pubkey, - 'slo_url': slo_url, - 'sso_url': sso_url - } - metadata = render_to_string('saml/xml/metadata.xml', ctx) - response = HttpResponse(metadata, content_type='application/xml') - response['Content-Disposition'] = 'attachment; filename="passbook_metadata.xml' - return response + + def get(self, request, application_id): + """Replies with the XML Metadata IDSSODescriptor.""" + application = get_object_or_404(SAMLProvider, pk=application_id) + entity_id = CONFIG.y('saml_idp.issuer') + slo_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_logout')) + sso_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_login_begin')) + pubkey = application.signing_cert + ctx = { + 'entity_id': entity_id, + 'cert_public_key': pubkey, + 'slo_url': slo_url, + 'sso_url': sso_url + } + metadata = render_to_string('saml/xml/metadata.xml', ctx) + response = HttpResponse(metadata, content_type='application/xml') + response['Content-Disposition'] = 'attachment; filename="passbook_metadata.xml' + return response # class IDPSettingsView(GenericSettingView):