sources/saml: correctly cleanup transient users, update forms

This commit is contained in:
Jens Langhammer 2020-06-24 22:27:14 +02:00
parent 05999cb8c7
commit 31e0d74495
11 changed files with 175 additions and 47 deletions

View file

@ -290,6 +290,7 @@ class TestEnroll2Step(SeleniumTestCase):
)
self.driver.find_element(By.CSS_SELECTOR, "[role=enroll]").click()
self.wait.until(ec.presence_of_element_located((By.ID, "id_username")))
self.driver.find_element(By.ID, "id_username").send_keys("foo")
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password_repeat").send_keys(USER().username)

View file

@ -2,6 +2,7 @@
from rest_framework.serializers import ModelSerializer
from rest_framework.viewsets import ModelViewSet
from passbook.admin.forms.source import SOURCE_FORM_FIELDS
from passbook.sources.saml.models import SAMLSource
@ -11,12 +12,12 @@ class SAMLSourceSerializer(ModelSerializer):
class Meta:
model = SAMLSource
fields = [
"pk",
fields = SOURCE_FORM_FIELDS + [
"issuer",
"idp_url",
"idp_logout_url",
"auto_logout",
"sso_url",
"binding_type",
"slo_url",
"temporary_user_delete_after",
"signing_kp",
]

View file

@ -1,8 +1,6 @@
"""passbook SAML SP Forms"""
from django import forms
from django.contrib.admin.widgets import FilteredSelectMultiple
from django.utils.translation import gettext as _
from passbook.admin.forms.source import SOURCE_FORM_FIELDS
from passbook.sources.saml.models import SAMLSource
@ -16,17 +14,16 @@ class SAMLSourceForm(forms.ModelForm):
model = SAMLSource
fields = SOURCE_FORM_FIELDS + [
"issuer",
"sso_url",
"binding_type",
"idp_url",
"idp_logout_url",
"auto_logout",
"slo_url",
"temporary_user_delete_after",
"signing_kp",
]
widgets = {
"name": forms.TextInput(),
"policies": FilteredSelectMultiple(_("policies"), False),
"issuer": forms.TextInput(),
"idp_url": forms.TextInput(),
"idp_logout_url": forms.TextInput(),
"sso_url": forms.TextInput(),
"slo_url": forms.TextInput(),
"temporary_user_delete_after": forms.TextInput(),
}
labels = {"signing_kp": _("Singing Keypair")}

View file

@ -0,0 +1,65 @@
# Generated by Django 3.0.7 on 2020-06-24 19:57
import django.db.models.deletion
from django.db import migrations, models
import passbook.providers.saml.utils.time
class Migration(migrations.Migration):
dependencies = [
("passbook_crypto", "0002_create_self_signed_kp"),
("passbook_sources_saml", "0002_auto_20200523_2329"),
]
operations = [
migrations.RemoveField(model_name="samlsource", name="auto_logout",),
migrations.RenameField(
model_name="samlsource", old_name="idp_url", new_name="sso_url",
),
migrations.RenameField(
model_name="samlsource", old_name="idp_logout_url", new_name="slo_url",
),
migrations.AddField(
model_name="samlsource",
name="temporary_user_delete_after",
field=models.TextField(
default="days=1",
help_text="Time offset when temporary users should be deleted. This only applies if your IDP uses the NameID Format 'transient', and the user doesn't log out manually. (Format: hours=1;minutes=2;seconds=3).",
validators=[
passbook.providers.saml.utils.time.timedelta_string_validator
],
verbose_name="Delete temporary users after",
),
),
migrations.AlterField(
model_name="samlsource",
name="signing_kp",
field=models.ForeignKey(
help_text="Certificate Key Pair of the IdP which Assertion's Signature is validated against.",
on_delete=django.db.models.deletion.PROTECT,
to="passbook_crypto.CertificateKeyPair",
verbose_name="Singing Keypair",
),
),
migrations.AlterField(
model_name="samlsource",
name="slo_url",
field=models.URLField(
blank=True,
default=None,
help_text="Optional URL if your IDP supports Single-Logout.",
null=True,
verbose_name="SLO URL",
),
),
migrations.AlterField(
model_name="samlsource",
name="sso_url",
field=models.URLField(
help_text="URL that the initial Login request is sent to.",
verbose_name="SSO URL",
),
),
]

View file

@ -6,6 +6,7 @@ from django.utils.translation import gettext_lazy as _
from passbook.core.models import Source
from passbook.core.types import UILoginButton
from passbook.crypto.models import CertificateKeyPair
from passbook.providers.saml.utils.time import timedelta_string_validator
class SAMLBindingTypes(models.TextChoices):
@ -25,11 +26,9 @@ class SAMLSource(Source):
help_text=_("Also known as Entity ID. Defaults the Metadata URL."),
)
idp_url = models.URLField(
verbose_name=_("IDP URL"),
help_text=_(
"URL that the initial SAML Request is sent to. Also known as a Binding."
),
sso_url = models.URLField(
verbose_name=_("SSO URL"),
help_text=_("URL that the initial Login request is sent to."),
)
binding_type = models.CharField(
max_length=100,
@ -37,19 +36,34 @@ class SAMLSource(Source):
default=SAMLBindingTypes.Redirect,
)
idp_logout_url = models.URLField(
default=None, blank=True, null=True, verbose_name=_("IDP Logout URL")
slo_url = models.URLField(
default=None,
blank=True,
null=True,
verbose_name=_("SLO URL"),
help_text=_("Optional URL if your IDP supports Single-Logout."),
)
temporary_user_delete_after = models.TextField(
default="days=1",
verbose_name=_("Delete temporary users after"),
validators=[timedelta_string_validator],
help_text=_(
(
"Time offset when temporary users should be deleted. This only applies if your IDP "
"uses the NameID Format 'transient', and the user doesn't log out manually. "
"(Format: hours=1;minutes=2;seconds=3)."
)
),
)
auto_logout = models.BooleanField(default=False)
signing_kp = models.ForeignKey(
CertificateKeyPair,
default=None,
null=True,
verbose_name=_("Singing Keypair"),
help_text=_(
"Certificate Key Pair of the IdP which Assertions are validated against."
"Certificate Key Pair of the IdP which Assertion's Signature is validated against."
),
on_delete=models.SET_NULL,
on_delete=models.PROTECT,
)
form = "passbook.sources.saml.forms.SAMLSourceForm"

View file

@ -1,5 +1,5 @@
"""passbook saml source processor"""
from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict
from defusedxml import ElementTree
from django.http import HttpRequest, HttpResponse
@ -21,6 +21,13 @@ from passbook.sources.saml.exceptions import (
UnsupportedNameIDFormat,
)
from passbook.sources.saml.models import SAMLSource
from passbook.sources.saml.processors.constants import (
SAML_NAME_ID_FORMAT_EMAIL,
SAML_NAME_ID_FORMAT_PRESISTENT,
SAML_NAME_ID_FORMAT_TRANSIENT,
SAML_NAME_ID_FORMAT_WINDOWS,
SAML_NAME_ID_FORMAT_X509,
)
from passbook.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
from passbook.stages.prompt.stage import PLAN_CONTEXT_PROMPT
@ -100,19 +107,16 @@ class Processor:
name_id_el = self._get_name_id()
name_id = name_id_el.text
if not name_id:
raise UnsupportedNameIDFormat(f"Subject's NameID is empty.")
raise UnsupportedNameIDFormat("Subject's NameID is empty.")
_format = name_id_el.attrib["Format"]
if _format == "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress":
if _format == SAML_NAME_ID_FORMAT_EMAIL:
return {"email": name_id}
if _format == "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent":
if _format == SAML_NAME_ID_FORMAT_PRESISTENT:
return {"username": name_id}
if _format == "urn:oasis:names:tc:SAML:2.0:nameid-format:X509SubjectName":
if _format == SAML_NAME_ID_FORMAT_X509:
# This attribute is statically set by the LDAP source
return {"attributes__distinguishedName": name_id}
if (
_format
== "urn:oasis:names:tc:SAML:2.0:nameid-format:WindowsDomainQualifiedName"
):
if _format == SAML_NAME_ID_FORMAT_WINDOWS:
if "\\" in name_id:
name_id = name_id.split("\\")[1]
return {"username": name_id}
@ -124,10 +128,7 @@ class Processor:
"""Prepare flow plan depending on whether or not the user exists"""
name_id = self._get_name_id()
# transient NameIDs are handeled seperately as they don't have to go through flows.
if (
name_id.attrib["Format"]
== "urn:oasis:names:tc:SAML:2.0:nameid-format:transient"
):
if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
return self._handle_name_id_transient(request)
name_id_filter = self._get_name_id_filter()

View file

@ -0,0 +1,8 @@
"""SAML Source processor constants"""
SAML_NAME_ID_FORMAT_EMAIL = "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
SAML_NAME_ID_FORMAT_PRESISTENT = "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent"
SAML_NAME_ID_FORMAT_X509 = "urn:oasis:names:tc:SAML:2.0:nameid-format:X509SubjectName"
SAML_NAME_ID_FORMAT_WINDOWS = (
"urn:oasis:names:tc:SAML:2.0:nameid-format:WindowsDomainQualifiedName"
)
SAML_NAME_ID_FORMAT_TRANSIENT = "urn:oasis:names:tc:SAML:2.0:nameid-format:transient"

View file

@ -0,0 +1,9 @@
"""saml source settings"""
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"saml_source_cleanup": {
"task": "passbook.sources.saml.tasks.clean_temporary_users",
"schedule": crontab(minute="*/5"),
}
}

View file

@ -13,6 +13,8 @@ LOGGER = get_logger()
# pylint: disable=unused-argument
def on_user_logged_out(sender, request: HttpRequest, user: User, **_):
"""Delete temporary user if the `delete_on_logout` flag is enabled"""
if not user:
return
if "saml" in user.attributes:
if "delete_on_logout" in user.attributes["saml"]:
if user.attributes["saml"]["delete_on_logout"]:

View file

@ -0,0 +1,31 @@
"""passbook saml source tasks"""
from django.utils.timezone import now
from structlog import get_logger
from passbook.core.models import User
from passbook.providers.saml.utils.time import timedelta_from_string
from passbook.root.celery import CELERY_APP
from passbook.sources.saml.models import SAMLSource
LOGGER = get_logger()
@CELERY_APP.task()
def clean_temporary_users():
"""Remove old temporary users"""
_now = now()
for user in User.objects.filter(attributes__saml__isnull=False):
sources = SAMLSource.objects.filter(
pk=user.attributes.get("saml", {}).get("source", "")
)
if not sources.exists():
LOGGER.warning(
"User has an invalid SAML Source and won't be deleted!", user=user
)
source = sources.first()
source_delta = timedelta_from_string(source.temporary_user_delete_after)
if _now - user.last_login >= source_delta:
LOGGER.debug(
"User is expired and will be deleted.", user=user, delta=source_delta
)
user.delete()

View file

@ -1,5 +1,6 @@
"""saml sp views"""
from django.contrib.auth import logout
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.utils.decorators import method_decorator
@ -35,7 +36,7 @@ class InitiateView(View):
request.session["sso_destination"] = relay_state
parameters = {
"ACS_URL": build_full_url("acs", request, source),
"DESTINATION": source.idp_url,
"DESTINATION": source.sso_url,
"AUTHN_REQUEST_ID": get_random_id(),
"ISSUE_INSTANT": get_time_string(),
"ISSUER": get_issuer(request, source),
@ -44,14 +45,14 @@ class InitiateView(View):
if source.binding_type == SAMLBindingTypes.Redirect:
_request = deflate_and_base64_encode(authn_req.encode())
url_args = urlencode({"SAMLRequest": _request, "RelayState": relay_state})
return redirect(f"{source.idp_url}?{url_args}")
return redirect(f"{source.sso_url}?{url_args}")
if source.binding_type == SAMLBindingTypes.POST:
_request = nice64(authn_req.encode())
return render(
request,
"saml/sp/login.html",
{
"request_url": source.idp_url,
"request_url": source.sso_url,
"request": _request,
"relay_state": relay_state,
"source": source,
@ -83,11 +84,12 @@ class ACSView(View):
return bad_request_message(request, str(exc))
class SLOView(View):
class SLOView(LoginRequiredMixin, View):
"""Single-Logout-View"""
def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
"""Replies with an XHTML SSO Request."""
# TODO: Replace with flows
source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
if not source.enabled:
raise Http404
@ -95,10 +97,7 @@ class SLOView(View):
return render(
request,
"saml/sp/sso_single_logout.html",
{
"idp_logout_url": source.idp_logout_url,
"autosubmit": source.auto_logout,
},
{"idp_logout_url": source.slo_url,},
)