code is now clean but still not working
This commit is contained in:
parent
c1276e9695
commit
b5bc371a04
|
@ -1 +1,2 @@
|
|||
"""passbook"""
|
||||
__version__ = '0.0.1-alpha'
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
from django.db.models import Model
|
||||
from rest_framework.serializers import ModelSerializer
|
||||
"""passbook admin api utils"""
|
||||
# from django.db.models import Model
|
||||
# from rest_framework.serializers import ModelSerializer
|
||||
|
||||
|
||||
class LookupSerializer(ModelSerializer):
|
||||
# class LookupSerializer(ModelSerializer):
|
||||
|
||||
mapping = {}
|
||||
# mapping = {}
|
||||
|
||||
def to_representation(self, instance):
|
||||
for __model, __serializer in self.mapping.items():
|
||||
if isinstance(instance, __model):
|
||||
return __serializer(instance=instance).to_representation(instance)
|
||||
raise KeyError(instance.__class__.__name__)
|
||||
# def to_representation(self, instance):
|
||||
# for __model, __serializer in self.mapping.items():
|
||||
# if isinstance(instance, __model):
|
||||
# return __serializer(instance=instance).to_representation(instance)
|
||||
# raise KeyError(instance.__class__.__name__)
|
||||
|
||||
class Meta:
|
||||
model = Model
|
||||
fields = '__all__'
|
||||
# class Meta:
|
||||
# model = Model
|
||||
# fields = '__all__'
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
|
||||
"""passbook admin mixins"""
|
||||
from django.contrib.auth.mixins import UserPassesTestMixin
|
||||
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
"""passbook URL Configuration"""
|
||||
from django.urls import include, path
|
||||
from django.urls import path
|
||||
|
||||
from passbook.admin.views import applications, overview, sources
|
||||
|
||||
|
|
|
@ -1,17 +1,20 @@
|
|||
"""passbook application administration"""
|
||||
|
||||
from django.views.generic import CreateView, DeleteView, ListView, UpdateView
|
||||
from django.views.generic import CreateView, ListView
|
||||
|
||||
from passbook.admin.mixins import AdminRequiredMixin
|
||||
from passbook.core.models import Application
|
||||
|
||||
|
||||
class ApplicationListView(AdminRequiredMixin, ListView):
|
||||
"""List all applications"""
|
||||
|
||||
model = Application
|
||||
template_name = 'administration/application/list.html'
|
||||
|
||||
|
||||
class ApplicationCreateView(AdminRequiredMixin, CreateView):
|
||||
"""Create new application"""
|
||||
|
||||
model = Application
|
||||
template_name = 'administration/application/create.html'
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
"""passbook administration overview"""
|
||||
from django.views.generic import TemplateView
|
||||
|
||||
from passbook.admin.mixins import AdminRequiredMixin
|
||||
from passbook.core.models import Application, Rule, User, Provider
|
||||
from passbook.core.models import Application, Provider, Rule, User
|
||||
|
||||
|
||||
class AdministrationOverviewView(AdminRequiredMixin, TemplateView):
|
||||
"""Overview View"""
|
||||
|
||||
template_name = 'administration/overview.html'
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
from django.contrib.messages.views import SuccessMessageMixin
|
||||
from django.urls import reverse_lazy
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.generic import CreateView, DeleteView, ListView, UpdateView
|
||||
from django.views.generic import CreateView, ListView, UpdateView
|
||||
|
||||
from passbook.admin.mixins import AdminRequiredMixin
|
||||
from passbook.core.models import Source
|
||||
|
@ -10,6 +10,7 @@ from passbook.lib.utils.reflection import path_to_class
|
|||
|
||||
|
||||
class SourceListView(AdminRequiredMixin, ListView):
|
||||
"""Show list of all sources"""
|
||||
|
||||
model = Source
|
||||
template_name = 'administration/source/list.html'
|
||||
|
@ -21,6 +22,7 @@ class SourceListView(AdminRequiredMixin, ListView):
|
|||
|
||||
|
||||
class SourceCreateView(SuccessMessageMixin, AdminRequiredMixin, CreateView):
|
||||
"""Create new Source"""
|
||||
|
||||
template_name = 'generic/create.html'
|
||||
success_url = reverse_lazy('passbook_admin:sources')
|
||||
|
@ -33,6 +35,7 @@ class SourceCreateView(SuccessMessageMixin, AdminRequiredMixin, CreateView):
|
|||
|
||||
|
||||
class SourceUpdateView(SuccessMessageMixin, AdminRequiredMixin, UpdateView):
|
||||
"""Update source"""
|
||||
|
||||
model = Source
|
||||
template_name = 'generic/update.html'
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
# Generated by Django 2.1.3 on 2018-11-16 10:21
|
||||
|
||||
from django.conf import settings
|
||||
import uuid
|
||||
|
||||
import django.contrib.auth.models
|
||||
import django.contrib.auth.validators
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
import django.utils.timezone
|
||||
import uuid
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
@ -6,7 +6,7 @@ from django.contrib import messages
|
|||
from django.contrib.auth import authenticate, login
|
||||
from django.contrib.auth.mixins import UserPassesTestMixin
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.shortcuts import redirect, render, reverse
|
||||
from django.shortcuts import redirect, reverse
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.generic import FormView
|
||||
|
||||
|
@ -89,14 +89,14 @@ class LoginView(UserPassesTestMixin, FormView):
|
|||
@staticmethod
|
||||
def invalid_login(request: HttpRequest, disabled_user: User = None) -> HttpResponse:
|
||||
"""Handle login for disabled users/invalid login attempts"""
|
||||
if disabled_user:
|
||||
context = {
|
||||
'reason': 'disabled',
|
||||
'user': disabled_user
|
||||
}
|
||||
else:
|
||||
context = {
|
||||
'reason': 'invalid',
|
||||
}
|
||||
# if disabled_user:
|
||||
# context = {
|
||||
# 'reason': 'disabled',
|
||||
# 'user': disabled_user
|
||||
# }
|
||||
# else:
|
||||
# context = {
|
||||
# 'reason': 'invalid',
|
||||
# }
|
||||
raise NotImplementedError()
|
||||
return render(request, 'login/invalid.html', context)
|
||||
# return render(request, 'login/invalid.html', context)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
"""passbook lib config loader"""
|
||||
import os
|
||||
from collections import Mapping
|
||||
from collections.abc import Mapping
|
||||
from contextlib import contextmanager
|
||||
from glob import glob
|
||||
from logging import getLogger
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
"""passbook decorators"""
|
||||
from time import time as timestamp
|
||||
|
||||
from django.conf import settings
|
||||
from django.shortcuts import redirect
|
||||
from django.urls import reverse
|
||||
from django.utils.functional import wraps
|
||||
from django.utils.http import urlencode
|
||||
|
||||
RE_AUTH_KEY = getattr(settings, 'RE_AUTH_KEY', 'passbook_require_re_auth_done')
|
||||
RE_AUTH_MARGAIN = getattr(settings, 'RE_AUTH_MARGAIN', 300)
|
||||
|
||||
|
||||
def reauth_required(view_function):
|
||||
"""Decorator to force a re-authentication before continuing"""
|
||||
|
||||
@wraps(view_function)
|
||||
def wrap(*args, **kwargs):
|
||||
"""check if user just authenticated or not"""
|
||||
|
||||
request = args[0] if args else None
|
||||
# Check if user is authenticated at all
|
||||
if not request or not request.user or not request.user.is_authenticated:
|
||||
return redirect(reverse('account-login'))
|
||||
|
||||
now = timestamp()
|
||||
|
||||
if RE_AUTH_KEY in request.session and \
|
||||
request.session[RE_AUTH_KEY] < (now - RE_AUTH_MARGAIN):
|
||||
# Timestamp in session but expired
|
||||
del request.session[RE_AUTH_KEY]
|
||||
|
||||
if RE_AUTH_KEY not in request.session:
|
||||
# Timestamp not in session, force user to reauth
|
||||
return redirect(reverse('account-reauth') + '?' +
|
||||
urlencode({'next': request.path}))
|
||||
|
||||
if RE_AUTH_KEY in request.session and \
|
||||
request.session[RE_AUTH_KEY] >= (now - RE_AUTH_MARGAIN) and \
|
||||
request.session[RE_AUTH_KEY] <= now:
|
||||
# Timestamp in session and valid
|
||||
return view_function(*args, **kwargs)
|
||||
|
||||
# This should never be reached, just return False
|
||||
return False # pragma: no cover
|
||||
return wrap
|
|
@ -1,7 +1,7 @@
|
|||
# Generated by Django 2.1.3 on 2018-11-16 10:21
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# Generated by Django 2.1.3 on 2018-11-16 10:21
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
from django.urls import include, path
|
||||
|
||||
from passbook.oauth_provider.views import oauth2
|
||||
# from passbook.oauth_provider.views import oauth2
|
||||
|
||||
urlpatterns = [
|
||||
# Custom OAuth 2 Authorize View
|
||||
|
|
|
@ -1,58 +1,58 @@
|
|||
"""passbook OAuth2 Views"""
|
||||
|
||||
from logging import getLogger
|
||||
# from logging import getLogger
|
||||
|
||||
from django.contrib import messages
|
||||
from django.http import Http404, HttpResponseRedirect
|
||||
from django.utils.translation import ugettext as _
|
||||
from oauth2_provider.models import get_application_model
|
||||
from oauth2_provider.views.base import AuthorizationView
|
||||
# from django.contrib import messages
|
||||
# from django.http import Http404, HttpResponseRedirect
|
||||
# from django.utils.translation import ugettext as _
|
||||
# from oauth2_provider.models import get_application_model
|
||||
# from oauth2_provider.views.base import AuthorizationView
|
||||
|
||||
# from passbook.core.models import Event, UserAcquirableRelationship
|
||||
# # from passbook.core.models import Event, UserAcquirableRelationship
|
||||
|
||||
LOGGER = getLogger(__name__)
|
||||
# LOGGER = getLogger(__name__)
|
||||
|
||||
|
||||
class PassbookAuthorizationView(AuthorizationView):
|
||||
"""Custom OAuth2 Authorization View which checks for invite_only products"""
|
||||
# class PassbookAuthorizationView(AuthorizationView):
|
||||
# """Custom OAuth2 Authorization View which checks for invite_only products"""
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
"""Check if request.user has a relationship with product"""
|
||||
full_res = super().get(request, *args, **kwargs)
|
||||
# If application cannot be found, oauth2_data is {}
|
||||
if self.oauth2_data == {}:
|
||||
return full_res
|
||||
# self.oauth2_data['application'] should be set, if not an error occured
|
||||
# if 'application' in self.oauth2_data:
|
||||
# app = self.oauth2_data['application']
|
||||
# if app.productextensionoauth2_set.exists() and \
|
||||
# app.productextensionoauth2_set.first().product_set.exists():
|
||||
# # Only check if there is a connection from OAuth2 Application to product
|
||||
# product = app.productextensionoauth2_set.first().product_set.first()
|
||||
# relationship = UserAcquirableRelationship.objects.filter(user=request.user,
|
||||
# model=product)
|
||||
# # Product is invite_only = True and no relation with user exists
|
||||
# if product.invite_only and not relationship.exists():
|
||||
# LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
|
||||
# messages.error(request, "You have no access to '%s'" % product.name)
|
||||
# raise Http404
|
||||
# if isinstance(full_res, HttpResponseRedirect):
|
||||
# # Application has skip authorization on
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via OAuth) (skipped Authz)' % app.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
return full_res
|
||||
# def get(self, request, *args, **kwargs):
|
||||
# """Check if request.user has a relationship with product"""
|
||||
# full_res = super().get(request, *args, **kwargs)
|
||||
# # If application cannot be found, oauth2_data is {}
|
||||
# if self.oauth2_data == {}:
|
||||
# return full_res
|
||||
# # self.oauth2_data['application'] should be set, if not an error occured
|
||||
# # if 'application' in self.oauth2_data:
|
||||
# # app = self.oauth2_data['application']
|
||||
# # if app.productextensionoauth2_set.exists() and \
|
||||
# # app.productextensionoauth2_set.first().product_set.exists():
|
||||
# # # Only check if there is a connection from OAuth2 Application to product
|
||||
# # product = app.productextensionoauth2_set.first().product_set.first()
|
||||
# # relationship = UserAcquirableRelationship.objects.filter(user=request.user,
|
||||
# # model=product)
|
||||
# # # Product is invite_only = True and no relation with user exists
|
||||
# # if product.invite_only and not relationship.exists():
|
||||
# # LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
|
||||
# # messages.error(request, "You have no access to '%s'" % product.name)
|
||||
# # raise Http404
|
||||
# # if isinstance(full_res, HttpResponseRedirect):
|
||||
# # # Application has skip authorization on
|
||||
# # Event.create(
|
||||
# # user=request.user,
|
||||
# # message=_('You authenticated %s (via OAuth) (skipped Authz)' % app.name),
|
||||
# # request=request,
|
||||
# # current=False,
|
||||
# # hidden=True)
|
||||
# return full_res
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
"""Add event on confirmation"""
|
||||
app = get_application_model().objects.get(client_id=request.GET["client_id"])
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via OAuth)' % app.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
return super().post(request, *args, **kwargs)
|
||||
# def post(self, request, *args, **kwargs):
|
||||
# """Add event on confirmation"""
|
||||
# app = get_application_model().objects.get(client_id=request.GET["client_id"])
|
||||
# # Event.create(
|
||||
# # user=request.user,
|
||||
# # message=_('You authenticated %s (via OAuth)' % app.name),
|
||||
# # request=request,
|
||||
# # current=False,
|
||||
# # hidden=True)
|
||||
# return super().post(request, *args, **kwargs)
|
||||
|
|
|
@ -84,7 +84,8 @@ class Processor:
|
|||
'AUTH_INSTANT': get_time_string(),
|
||||
'ISSUE_INSTANT': get_time_string(),
|
||||
'NOT_BEFORE': get_time_string(-1 * HOURS), # TODO: Make these settings.
|
||||
'NOT_ON_OR_AFTER': get_time_string(int(CONFIG.y('saml_idp.assertion_valid_for')) * MINUTES),
|
||||
'NOT_ON_OR_AFTER': get_time_string(int(CONFIG.y('saml_idp.assertion_valid_for'))
|
||||
* MINUTES),
|
||||
'SESSION_INDEX': self._session_index,
|
||||
'SESSION_NOT_ON_OR_AFTER': get_time_string(8 * HOURS),
|
||||
'SP_NAME_QUALIFIER': self._audience,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
# Generated by Django 2.1.3 on 2018-11-16 10:21
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
@ -21,3 +21,6 @@ class SAMLApplication(Application):
|
|||
|
||||
def __str__(self):
|
||||
return "SAMLApplication %s (processor=%s)" % (self.name, self.processor_path)
|
||||
|
||||
def user_is_authorized(self):
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
"""Shibboleth Processor"""
|
||||
|
||||
from supervisr.mod.auth.saml.idp.base import Processor
|
||||
from passbook.saml_idp.base import Processor
|
||||
|
||||
|
||||
class ShibbolethProcessor(Processor):
|
||||
|
|
|
@ -1,28 +1,31 @@
|
|||
"""passbook SAML IDP Views"""
|
||||
from logging import getLogger
|
||||
|
||||
from django.contrib import auth, messages
|
||||
from django.contrib import auth
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.validators import URLValidator
|
||||
from django.http import (Http404, HttpResponse, HttpResponseBadRequest,
|
||||
from django.http import (HttpResponse, HttpResponseBadRequest,
|
||||
HttpResponseRedirect)
|
||||
from django.shortcuts import redirect, render
|
||||
from django.urls import reverse
|
||||
from django.utils.datastructures import MultiValueDictKeyError
|
||||
from django.utils.html import escape
|
||||
from django.utils.translation import ugettext as _
|
||||
# from django.utils.html import escape
|
||||
# from django.utils.translation import ugettext as _
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from OpenSSL.crypto import FILETYPE_PEM
|
||||
from OpenSSL.crypto import Error as CryptoError
|
||||
from OpenSSL.crypto import load_certificate
|
||||
|
||||
from passbook.lib.config import CONFIG
|
||||
# from passbook.core.models import Event, Setting, UserAcquirableRelationship
|
||||
from passbook.lib.utils.template import render_to_string
|
||||
# from passbook.core.views.common import ErrorResponseView
|
||||
# from passbook.core.views.settings import GenericSettingView
|
||||
from passbook.saml_idp import exceptions, registry, xml_signing
|
||||
|
||||
# from OpenSSL.crypto import FILETYPE_PEM
|
||||
# from OpenSSL.crypto import Error as CryptoError
|
||||
# from OpenSSL.crypto import load_certificate
|
||||
|
||||
|
||||
LOGGER = getLogger(__name__)
|
||||
URL_VALIDATOR = URLValidator(schemes=('http', 'https'))
|
||||
|
||||
|
@ -82,25 +85,25 @@ def login_process(request):
|
|||
proc, remote = registry.find_processor(request)
|
||||
# Check if user has access
|
||||
access = True
|
||||
if remote.productextensionsaml2_set.exists() and \
|
||||
remote.productextensionsaml2_set.first().product_set.exists():
|
||||
# Only check if there is a connection from OAuth2 Application to product
|
||||
product = remote.productextensionsaml2_set.first().product_set.first()
|
||||
relationship = UserAcquirableRelationship.objects.filter(user=request.user, model=product)
|
||||
# Product is invite_only = True and no relation with user exists
|
||||
if product.invite_only and not relationship.exists():
|
||||
access = False
|
||||
# if remote.productextensionsaml2_set.exists() and \
|
||||
# remote.productextensionsaml2_set.first().product_set.exists():
|
||||
# # Only check if there is a connection from OAuth2 Application to product
|
||||
# product = remote.productextensionsaml2_set.first().product_set.first()
|
||||
# relationship = UserAcquirableRelationship.objects.filter(user=request.user, model=product)
|
||||
# # Product is invite_only = True and no relation with user exists
|
||||
# if product.invite_only and not relationship.exists():
|
||||
# access = False
|
||||
# Check if we should just autosubmit
|
||||
if remote.skip_authorization and access:
|
||||
# full_res = _generate_response(request, proc, remote)
|
||||
ctx = proc.generate_response()
|
||||
# User accepted request
|
||||
Event.create(
|
||||
user=request.user,
|
||||
message=_('You authenticated %s (via SAML) (skipped Authz)' % remote.name),
|
||||
request=request,
|
||||
current=False,
|
||||
hidden=True)
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via SAML) (skipped Authz)' % remote.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
return redirect_to_sp(
|
||||
request=request,
|
||||
acs_url=ctx['acs_url'],
|
||||
|
@ -108,12 +111,12 @@ def login_process(request):
|
|||
relay_state=ctx['relay_state'])
|
||||
if request.method == 'POST' and request.POST.get('ACSUrl', None) and access:
|
||||
# User accepted request
|
||||
Event.create(
|
||||
user=request.user,
|
||||
message=_('You authenticated %s (via SAML)' % remote.name),
|
||||
request=request,
|
||||
current=False,
|
||||
hidden=True)
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via SAML)' % remote.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
return redirect_to_sp(
|
||||
request=request,
|
||||
acs_url=request.POST.get('ACSUrl'),
|
||||
|
@ -121,13 +124,14 @@ def login_process(request):
|
|||
relay_state=request.POST.get('RelayState'))
|
||||
try:
|
||||
full_res = _generate_response(request, proc, remote)
|
||||
if not access:
|
||||
LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
|
||||
messages.error(request, "You have no access to '%s'" % product.name)
|
||||
raise Http404
|
||||
# if not access:
|
||||
# LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
|
||||
# messages.error(request, "You have no access to '%s'" % product.name)
|
||||
# raise Http404
|
||||
return full_res
|
||||
except exceptions.CannotHandleAssertion as exc:
|
||||
return ErrorResponseView.as_view()(request, str(exc))
|
||||
LOGGER.debug(exc)
|
||||
# return ErrorResponseView.as_view()(request, str(exc))
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
|
|
|
@ -6,7 +6,7 @@ from django.contrib.auth.models import AnonymousUser
|
|||
from django.test import RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
|
||||
from passbook.core.views import common
|
||||
from passbook.core.views import overview
|
||||
from passbook.tfa.middleware import tfa_force_verify
|
||||
|
||||
|
||||
|
@ -19,7 +19,7 @@ class TestMiddleware(TestCase):
|
|||
|
||||
def test_tfa_force_verify_anon(self):
|
||||
"""Test Anonymous TFA Force"""
|
||||
request = self.factory.get(reverse('common-index'))
|
||||
request = self.factory.get(reverse('passbook_core:overview'))
|
||||
request.user = AnonymousUser()
|
||||
response = tfa_force_verify(common.IndexView.as_view())(request)
|
||||
response = tfa_force_verify(overview.OverviewView.as_view())(request)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
|
|
@ -8,7 +8,7 @@ urlpatterns = [
|
|||
url(r'^$', views.index, name='tfa-index'),
|
||||
url(r'qr/$', views.qr_code, name='tfa-qr'),
|
||||
url(r'verify/$', views.verify, name='tfa-verify'),
|
||||
url(r'enable/$', views.TFASetupView.as_view(), name='tfa-enable'),
|
||||
# url(r'enable/$', views.TFASetupView.as_view(), name='tfa-enable'),
|
||||
url(r'disable/$', views.disable, name='tfa-disable'),
|
||||
url(r'user_settings/$', views.user_settings, name='tfa-user_settings'),
|
||||
]
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
"""passbook 2FA Views"""
|
||||
from base64 import b32encode
|
||||
from binascii import unhexlify
|
||||
# from base64 import b32encode
|
||||
# from binascii import unhexlify
|
||||
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.http import Http404, HttpRequest, HttpResponse
|
||||
from django.shortcuts import get_object_or_404, redirect, render
|
||||
from django.urls import reverse
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.utils.translation import ugettext as _
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django_otp import login, match_token, user_has_device
|
||||
|
@ -17,14 +16,13 @@ from django_otp.plugins.otp_totp.models import TOTPDevice
|
|||
from qrcode import make as qr_make
|
||||
from qrcode.image.svg import SvgPathImage
|
||||
|
||||
from passbook.core.decorators import reauth_required
|
||||
from passbook.core.models import Event
|
||||
from passbook.core.views.wizards import BaseWizardView
|
||||
from passbook.mod.tfa.forms import (TFASetupInitForm, TFASetupStaticForm,
|
||||
TFAVerifyForm)
|
||||
from passbook.mod.tfa.utils import otpauth_url
|
||||
from passbook.lib.decorators import reauth_required
|
||||
# from passbook.core.models import Event
|
||||
# from passbook.core.views.wizards import BaseWizardView
|
||||
from passbook.tfa.forms import TFAVerifyForm
|
||||
from passbook.tfa.utils import otpauth_url
|
||||
|
||||
TFA_SESSION_KEY = 'passbook_mod_2fa_key'
|
||||
TFA_SESSION_KEY = 'passbook_2fa_key'
|
||||
|
||||
|
||||
@login_required
|
||||
|
@ -96,99 +94,99 @@ def disable(request: HttpRequest) -> HttpResponse:
|
|||
token.delete()
|
||||
messages.success(request, 'Successfully disabled 2FA')
|
||||
# Create event with email notification
|
||||
Event.create(
|
||||
user=request.user,
|
||||
message=_('You disabled 2FA.'),
|
||||
current=True,
|
||||
request=request,
|
||||
send_notification=True)
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You disabled 2FA.'),
|
||||
# current=True,
|
||||
# request=request,
|
||||
# send_notification=True)
|
||||
return redirect(reverse('common-index'))
|
||||
|
||||
|
||||
# pylint: disable=too-many-ancestors
|
||||
@method_decorator([login_required, reauth_required], name="dispatch")
|
||||
class TFASetupView(BaseWizardView):
|
||||
"""Wizard to create a Mail Account"""
|
||||
# # pylint: disable=too-many-ancestors
|
||||
# @method_decorator([login_required, reauth_required], name="dispatch")
|
||||
# class TFASetupView(BaseWizardView):
|
||||
# """Wizard to create a Mail Account"""
|
||||
|
||||
title = _('Set up 2FA')
|
||||
form_list = [TFASetupInitForm, TFASetupStaticForm]
|
||||
# title = _('Set up 2FA')
|
||||
# form_list = [TFASetupInitForm, TFASetupStaticForm]
|
||||
|
||||
totp_device = None
|
||||
static_device = None
|
||||
confirmed = False
|
||||
# totp_device = None
|
||||
# static_device = None
|
||||
# confirmed = False
|
||||
|
||||
def get_template_names(self):
|
||||
if self.steps.current == '1':
|
||||
return 'tfa/wizard_setup_static.html'
|
||||
return self.template_name
|
||||
# def get_template_names(self):
|
||||
# if self.steps.current == '1':
|
||||
# return 'tfa/wizard_setup_static.html'
|
||||
# return self.template_name
|
||||
|
||||
def handle_request(self, request: HttpRequest):
|
||||
# Check if user has 2FA setup already
|
||||
finished_totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=True)
|
||||
finished_static_devices = StaticDevice.objects.filter(user=request.user, confirmed=True)
|
||||
if finished_totp_devices.exists() or finished_static_devices.exists():
|
||||
messages.error(request, _('You already have 2FA enabled!'))
|
||||
return redirect(reverse('common-index'))
|
||||
# Check if there's an unconfirmed device left to set up
|
||||
totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=False)
|
||||
if not totp_devices.exists():
|
||||
# Create new TOTPDevice and save it, but not confirm it
|
||||
self.totp_device = TOTPDevice(user=request.user, confirmed=False)
|
||||
self.totp_device.save()
|
||||
else:
|
||||
self.totp_device = totp_devices.first()
|
||||
# def handle_request(self, request: HttpRequest):
|
||||
# # Check if user has 2FA setup already
|
||||
# finished_totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=True)
|
||||
# finished_static_devices = StaticDevice.objects.filter(user=request.user, confirmed=True)
|
||||
# if finished_totp_devices.exists() or finished_static_devices.exists():
|
||||
# messages.error(request, _('You already have 2FA enabled!'))
|
||||
# return redirect(reverse('common-index'))
|
||||
# # Check if there's an unconfirmed device left to set up
|
||||
# totp_devices = TOTPDevice.objects.filter(user=request.user, confirmed=False)
|
||||
# if not totp_devices.exists():
|
||||
# # Create new TOTPDevice and save it, but not confirm it
|
||||
# self.totp_device = TOTPDevice(user=request.user, confirmed=False)
|
||||
# self.totp_device.save()
|
||||
# else:
|
||||
# self.totp_device = totp_devices.first()
|
||||
|
||||
# Check if we have a static device already
|
||||
static_devices = StaticDevice.objects.filter(user=request.user, confirmed=False)
|
||||
if not static_devices.exists():
|
||||
# Create new static device and some codes
|
||||
self.static_device = StaticDevice(user=request.user, confirmed=False)
|
||||
self.static_device.save()
|
||||
# Create 9 tokens and save them
|
||||
# pylint: disable=unused-variable
|
||||
for counter in range(0, 9):
|
||||
token = StaticToken(device=self.static_device, token=StaticToken.random_token())
|
||||
token.save()
|
||||
else:
|
||||
self.static_device = static_devices.first()
|
||||
# # Check if we have a static device already
|
||||
# static_devices = StaticDevice.objects.filter(user=request.user, confirmed=False)
|
||||
# if not static_devices.exists():
|
||||
# # Create new static device and some codes
|
||||
# self.static_device = StaticDevice(user=request.user, confirmed=False)
|
||||
# self.static_device.save()
|
||||
# # Create 9 tokens and save them
|
||||
# # pylint: disable=unused-variable
|
||||
# for counter in range(0, 9):
|
||||
# token = StaticToken(device=self.static_device, token=StaticToken.random_token())
|
||||
# token.save()
|
||||
# else:
|
||||
# self.static_device = static_devices.first()
|
||||
|
||||
# Somehow convert the generated key to base32 for the QR code
|
||||
rawkey = unhexlify(self.totp_device.key.encode('ascii'))
|
||||
request.session[TFA_SESSION_KEY] = b32encode(rawkey).decode("utf-8")
|
||||
return True
|
||||
# # Somehow convert the generated key to base32 for the QR code
|
||||
# rawkey = unhexlify(self.totp_device.key.encode('ascii'))
|
||||
# request.session[TFA_SESSION_KEY] = b32encode(rawkey).decode("utf-8")
|
||||
# return True
|
||||
|
||||
def get_form(self, step=None, data=None, files=None):
|
||||
form = super(TFASetupView, self).get_form(step, data, files)
|
||||
if step is None:
|
||||
step = self.steps.current
|
||||
if step == '0':
|
||||
form.confirmed = self.confirmed
|
||||
form.device = self.totp_device
|
||||
form.fields['qr_code'].initial = reverse('passbook_mod_tfa:tfa-qr')
|
||||
elif step == '1':
|
||||
# This is a bit of a hack, but the 2fa token from step 1 has been checked here
|
||||
# And we need to save it, otherwise it's going to fail in render_done
|
||||
# and we're going to be redirected to step0
|
||||
self.confirmed = True
|
||||
# def get_form(self, step=None, data=None, files=None):
|
||||
# form = super(TFASetupView, self).get_form(step, data, files)
|
||||
# if step is None:
|
||||
# step = self.steps.current
|
||||
# if step == '0':
|
||||
# form.confirmed = self.confirmed
|
||||
# form.device = self.totp_device
|
||||
# form.fields['qr_code'].initial = reverse('passbook_tfa:tfa-qr')
|
||||
# elif step == '1':
|
||||
# # This is a bit of a hack, but the 2fa token from step 1 has been checked here
|
||||
# # And we need to save it, otherwise it's going to fail in render_done
|
||||
# # and we're going to be redirected to step0
|
||||
# self.confirmed = True
|
||||
|
||||
tokens = [(x.token, x.token) for x in self.static_device.token_set.all()]
|
||||
form.fields['tokens'].choices = tokens
|
||||
return form
|
||||
# tokens = [(x.token, x.token) for x in self.static_device.token_set.all()]
|
||||
# form.fields['tokens'].choices = tokens
|
||||
# return form
|
||||
|
||||
def finish(self, *forms):
|
||||
# Save device as confirmed
|
||||
self.totp_device.confirmed = True
|
||||
self.totp_device.save()
|
||||
self.static_device.confirmed = True
|
||||
self.static_device.save()
|
||||
# Create event with email notification
|
||||
Event.create(
|
||||
user=self.request.user,
|
||||
message=_('You activated 2FA.'),
|
||||
current=True,
|
||||
request=self.request,
|
||||
send_notification=True)
|
||||
return redirect(reverse('passbook_mod_tfa:tfa-index'))
|
||||
# def finish(self, *forms):
|
||||
# # Save device as confirmed
|
||||
# self.totp_device.confirmed = True
|
||||
# self.totp_device.save()
|
||||
# self.static_device.confirmed = True
|
||||
# self.static_device.save()
|
||||
# # Create event with email notification
|
||||
# Event.create(
|
||||
# user=self.request.user,
|
||||
# message=_('You activated 2FA.'),
|
||||
# current=True,
|
||||
# request=self.request,
|
||||
# send_notification=True)
|
||||
# return redirect(reverse('passbook_tfa:tfa-index'))
|
||||
|
||||
|
||||
@never_cache
|
||||
|
@ -199,7 +197,7 @@ def qr_code(request: HttpRequest) -> HttpResponse:
|
|||
try:
|
||||
key = request.session[TFA_SESSION_KEY]
|
||||
except KeyError:
|
||||
raise Http404()
|
||||
raise Http404
|
||||
|
||||
url = otpauth_url(accountname=request.user.username, secret=key)
|
||||
# Make and return QR code
|
||||
|
|
Reference in New Issue